Source code for labgrid.driver.usbtmcdriver

from importlib import import_module
from decimal import Decimal
import attr

from .common import Driver
from ..factory import target_factory
from ..exceptions import InvalidConfigError
from ..util.agentwrapper import AgentWrapper, b2s, s2b

[docs] @target_factory.reg_driver @attr.s(eq=False) class USBTMCDriver(Driver): bindings = { "tmc": {"USBTMC", "NetworkUSBTMC"}, }
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self.wrapper = None self.backend = None assert self.tmc.path.startswith('/dev/usbtmc') self.index = int(self.tmc.path[11:], 10) assert self.tmc.path == '/dev/usbtmc'+str(self.index)
[docs] def on_activate(self): assert self.wrapper is None self.wrapper = AgentWrapper( match = (self.tmc.vendor_id, self.tmc.model_id) if match == (0x0957, 0x1798): model = 'keysight_dsox2000' elif match == (0x0699, 0x0368): model = 'tektronix_tds2000' else: raise InvalidConfigError("Unkown USB TMC device {:04x}:{:04x}".format(*match)) # TODO: allow backends to register models with other names self.backend = import_module( f".usbtmc.{model}", __package__ )
[docs] def on_deactivate(self): assert self.wrapper is not None self.wrapper.close() self.wrapper = None self.backend = None
[docs] @Driver.check_active def command(self, cmd): assert isinstance(cmd, str) cmd = b2s(cmd.encode('ASCII')+b'\n') self.wrapper.usbtmc(self.index, cmd, read=False)
[docs] @Driver.check_active def query(self, cmd, binary=False, raw=False): assert isinstance(cmd, str) cmd = b2s(cmd.encode('ASCII')+b'\n') res = s2b(self.wrapper.usbtmc(self.index, cmd, read=True)) if raw: return res if binary: assert res[0:1] == b'#' digits = int(res[1:2], 10) count = int(res[2:2+digits], 10) return res[2+digits:2+digits+count] assert res[-1:] == b'\n' return res[:-1].decode('ASCII')
[docs] @Driver.check_active def identify(self): return self.query('*IDN?')
[docs] @Driver.check_active def get_channel_info(self, channel): return self.backend.get_channel_info(self, channel)
[docs] @Driver.check_active def get_channel_values(self, channel): return self.backend.get_channel_values(self, channel)
[docs] @Driver.check_active def get_screenshot(self): png = getattr(self.backend, 'get_screenshot_png', None) tiff = getattr(self.backend, 'get_screenshot_tiff', None) if png: return 'png', png(self) # pylint: disable=not-callable if tiff: return 'tiff', tiff(self) # pylint: disable=not-callable raise InvalidConfigError("get_screenshot_png/_tiff not implemented")
[docs] @Driver.check_active def get_bool(self, cmd): return bool(int(self.query(cmd).strip()))
[docs] @Driver.check_active def get_int(self, cmd): return int(self.query(cmd).strip())
[docs] @Driver.check_active def get_decimal(self, cmd): return Decimal(self.query(cmd).strip())
[docs] @Driver.check_active def get_str(self, cmd): return self.query(cmd)