Source code for labgrid.driver.usbvideodriver

# pylint: disable=no-member
import subprocess
import attr

from .common import Driver
from ..factory import target_factory
from ..resource.remote import NetworkUSBVideo
from ..resource.udev import USBVideo
from ..exceptions import InvalidConfigError

[docs]@target_factory.reg_driver @attr.s(cmp=False) class USBVideoDriver(Driver): bindings = { "video": {USBVideo, NetworkUSBVideo}, }
[docs] def get_caps(self): match = (self.video.vendor_id, self.video.model_id) if match == (0x046d, 0x082d): return ("mid", [ ("low", "video/x-h264,width=640,height=360,framerate=5/1"), ("mid", "video/x-h264,width=1280,height=720,framerate=15/2"), ("high", "video/x-h264,width=1920,height=1080,framerate=10/1"), ]) raise InvalidConfigError("Unkown USB video device {:04x}:{:04x}".format(*match))
[docs] def select_caps(self, hint=None): default, variants = self.get_caps() variant = hint if hint else default for name, caps in variants: if name == variant: return caps raise InvalidConfigError("Unkown video format {} for device {:04x}:{:04x}".format( variant, self.video.vendor_id, self.video.model_id))
[docs] @Driver.check_active def stream(self, caps_hint=None): caps = self.select_caps(caps_hint) tx_cmd = self.video.command_prefix + ["gst-launch-1.0"] tx_cmd += "v4l2src ! {} ! h264parse ! fdsink".format(caps).split() rx_cmd = ["gst-launch-1.0"] rx_cmd += "fdsrc ! h264parse ! avdec_h264 ! glimagesink sync=false".split() tx = subprocess.Popen( tx_cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, ) rx = subprocess.Popen( rx_cmd, stdin=tx.stdout, stdout=subprocess.DEVNULL, ) # wait until one subprocess has termianted while True: try: tx.wait(timeout=0.1) break except subprocess.TimeoutExpired: pass try: rx.wait(timeout=0.1) break except subprocess.TimeoutExpired: pass rx.terminate() tx.terminate()