Source code for labgrid.driver.usbvideodriver

# pylint: disable=no-member
import logging
import subprocess

import attr

from ..exceptions import InvalidConfigError
from ..factory import target_factory
from ..protocol import VideoProtocol
from .common import Driver


[docs]@target_factory.reg_driver @attr.s(eq=False) class USBVideoDriver(Driver, VideoProtocol): bindings = { "video": {"USBVideo", "NetworkUSBVideo"}, }
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self.logger = logging.getLogger(f"{self}") self._prepared = False
[docs] def get_qualities(self): match = (self.video.vendor_id, self.video.model_id) if match == (0x046d, 0x082d): return ("mid", [ ("low", "video/x-h264,width=640,height=360,framerate=5/1"), ("mid", "video/x-h264,width=1280,height=720,framerate=15/2"), ("high", "video/x-h264,width=1920,height=1080,framerate=10/1"), ]) if match == (0x046d, 0x0892): return ("mid", [ ("low", "image/jpeg,width=640,height=360,framerate=5/1"), ("mid", "image/jpeg,width=1280,height=720,framerate=15/2"), ("high", "image/jpeg,width=1920,height=1080,framerate=10/1"), ]) if match == (0x046d, 0x08e5): # Logitech HD Pro Webcam C920 return ("mid", [ ("low", "image/jpeg,width=640,height=360,framerate=5/1"), ("mid", "image/jpeg,width=1280,height=720,framerate=15/2"), ("high", "image/jpeg,width=1920,height=1080,framerate=10/1"), ]) if match == (0x1224, 0x2825): # LogiLink UA0371 return ("mid", [ ("low", "image/jpeg,width=640,height=480,framerate=30/1"), ("mid", "image/jpeg,width=1280,height=720,framerate=30/1"), ("high", "image/jpeg,width=1920,height=1080,framerate=30/1"), ]) if match == (0x05a3, 0x9331): # WansView Webcam 102 return ("mid", [ ("low","video/x-h264,width=640,height=360,framerate=30/1"), ("mid","video/x-h264,width=1280,height=720,framerate=30/1"), ("high","video/x-h264,width=1920,height=1080,framerate=30/1"), ]) if match == (0x534d, 0x2109): # MacroSilicon return ("mid", [ ("low", "image/jpeg,width=720,height=480,framerate=10/1"), ("mid", "image/jpeg,width=1280,height=720,framerate=10/1"), ("high", "image/jpeg,width=1920,height=1080,framerate=10/1"), ]) if match == (0x1d6c, 0x0103): # HD 2MP WEBCAM return ("mid", [ ("low", "video/x-h264,width=640,height=480,framerate=25/1"), ("mid", "video/x-h264,width=1280,height=720,framerate=25/1"), ("high", "video/x-h264,width=1920,height=1080,framerate=25/1"), ]) self.logger.warning( "Unkown USB video device {:04x}:{:04x}, using fallback pipeline." .format(*match)) return ("mid", [ ("low", "image/jpeg,width=640,height=480,framerate=30/1"), ("mid", "image/jpeg,width=1280,height=720,framerate=30/1"), ("high", "image/jpeg,width=1920,height=1080,framerate=30/1"), ])
[docs] def select_caps(self, hint=None): default, variants = self.get_qualities() variant = hint if hint else default for name, caps in variants: if name == variant: return caps raise InvalidConfigError( f"Unknown video format {variant} for device {self.video.vendor_id:04x}:{self.video.model_id:04x}" # pylint: disable=line-too-long )
[docs] def get_pipeline(self, path, caps, controls=None): match = (self.video.vendor_id, self.video.model_id) if match == (0x046d, 0x082d): controls = controls or "focus_auto=1" inner = "h264parse" elif match == (0x046d, 0x0892): controls = controls or "focus_auto=1" inner = None elif match == (0x046d, 0x08e5): controls = controls or "focus_auto=1" inner = None elif match == (0x1224, 0x2825): # LogiLink UA0371 inner = None # just forward the jpeg frames elif match == (0x05a3, 0x9331): # WansView Webcam 102 inner = "h264parse" elif match == (0x534d, 0x2109): inner = None # just forward the jpeg frames elif match == (0x1d6c, 0x0103): controls = controls or "focus_auto=1" inner = "h264parse" else: # fallback pipeline inner = None # just forward the jpeg frames pipeline = f"v4l2src device={path} " if controls: pipeline += f"extra-controls=c,{controls} " pipeline += f"! {caps} " if inner: pipeline += f"! {inner} " pipeline += "! matroskamux streamable=true ! fdsink" return pipeline
[docs] @Driver.check_active def stream(self, caps_hint=None, controls=None): caps = self.select_caps(caps_hint) pipeline = self.get_pipeline(self.video.path, caps, controls) tx_cmd = self.video.command_prefix + ["gst-launch-1.0", "-q"] tx_cmd += pipeline.split() rx_cmd = ["gst-launch-1.0"] rx_cmd += "playbin uri=fd://0".split() tx = subprocess.Popen( tx_cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, ) rx = subprocess.Popen( rx_cmd, stdin=tx.stdout, stdout=subprocess.DEVNULL, ) # wait until one subprocess has terminated while True: try: tx.wait(timeout=0.1) break except subprocess.TimeoutExpired: pass try: rx.wait(timeout=0.1) break except subprocess.TimeoutExpired: pass rx.terminate() tx.terminate()