# pylint: disable=no-member
import subprocess
import attr
from .common import Driver
from ..factory import target_factory
from ..exceptions import InvalidConfigError
[docs]@target_factory.reg_driver
@attr.s(eq=False)
class USBVideoDriver(Driver):
bindings = {
"video": {"USBVideo", "NetworkUSBVideo"},
}
[docs] def get_caps(self):
match = (self.video.vendor_id, self.video.model_id)
if match == (0x046d, 0x082d):
return ("mid", [
("low", "video/x-h264,width=640,height=360,framerate=5/1"),
("mid", "video/x-h264,width=1280,height=720,framerate=15/2"),
("high", "video/x-h264,width=1920,height=1080,framerate=10/1"),
])
if match == (0x046d, 0x0892):
return ("mid", [
("low", "image/jpeg,width=640,height=360,framerate=5/1"),
("mid", "image/jpeg,width=1280,height=720,framerate=15/2"),
("high", "image/jpeg,width=1920,height=1080,framerate=10/1"),
])
raise InvalidConfigError("Unkown USB video device {:04x}:{:04x}".format(*match))
[docs] def select_caps(self, hint=None):
default, variants = self.get_caps()
variant = hint if hint else default
for name, caps in variants:
if name == variant:
return caps
raise InvalidConfigError("Unkown video format {} for device {:04x}:{:04x}".format(
variant, self.video.vendor_id, self.video.model_id))
[docs] def get_pipeline(self):
match = (self.video.vendor_id, self.video.model_id)
if match == (0x046d, 0x082d):
return "v4l2src device={} ! {} ! h264parse ! fdsink"
if match == (0x046d, 0x0892):
return "v4l2src device={} ! {} ! decodebin ! vaapipostproc ! vaapih264enc ! h264parse ! fdsink" # pylint: disable=line-too-long
raise InvalidConfigError("Unkown USB video device {:04x}:{:04x}".format(*match))
[docs] @Driver.check_active
def stream(self, caps_hint=None):
caps = self.select_caps(caps_hint)
pipeline = self.get_pipeline()
tx_cmd = self.video.command_prefix + ["gst-launch-1.0"]
tx_cmd += pipeline.format(self.video.path, caps).split()
rx_cmd = ["gst-launch-1.0"]
rx_cmd += "fdsrc ! h264parse ! avdec_h264 ! glimagesink sync=false".split()
tx = subprocess.Popen(
tx_cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
)
rx = subprocess.Popen(
rx_cmd,
stdin=tx.stdout,
stdout=subprocess.DEVNULL,
)
# wait until one subprocess has termianted
while True:
try:
tx.wait(timeout=0.1)
break
except subprocess.TimeoutExpired:
pass
try:
rx.wait(timeout=0.1)
break
except subprocess.TimeoutExpired:
pass
rx.terminate()
tx.terminate()