# pylint: disable=no-member
from importlib import import_module
from decimal import Decimal
import attr
from .common import Driver
from ..factory import target_factory
from ..exceptions import InvalidConfigError
from ..util.agentwrapper import AgentWrapper, b2s, s2b
[docs]@target_factory.reg_driver
@attr.s(eq=False)
class USBTMCDriver(Driver):
bindings = {
"tmc": {"USBTMC", "NetworkUSBTMC"},
}
[docs] def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None
self.backend = None
assert self.tmc.path.startswith('/dev/usbtmc')
self.index = int(self.tmc.path[11:], 10)
assert self.tmc.path == '/dev/usbtmc'+str(self.index)
[docs] def on_activate(self):
assert self.wrapper is None
self.wrapper = AgentWrapper(self.tmc.host)
match = (self.tmc.vendor_id, self.tmc.model_id)
if match == (0x0957, 0x1798):
model = 'keysight_dsox2000'
elif match == (0x0699, 0x0368):
model = 'tektronix_tds2000'
else:
raise InvalidConfigError("Unkown USB TMC device {:04x}:{:04x}".format(*match))
# TODO: allow backends to register models with other names
self.backend = import_module(
".usbtmc.{}".format(model),
__package__
)
[docs] def on_deactivate(self):
assert self.wrapper is not None
self.wrapper.close()
self.wrapper = None
self.backend = None
[docs] @Driver.check_active
def command(self, cmd):
assert isinstance(cmd, str)
cmd = b2s(cmd.encode('ASCII')+b'\n')
self.wrapper.usbtmc(self.index, cmd, read=False)
[docs] @Driver.check_active
def query(self, cmd, binary=False, raw=False):
assert isinstance(cmd, str)
cmd = b2s(cmd.encode('ASCII')+b'\n')
res = s2b(self.wrapper.usbtmc(self.index, cmd, read=True))
if raw:
return res
if binary:
assert res[0:1] == b'#'
digits = int(res[1:2], 10)
count = int(res[2:2+digits], 10)
return res[2+digits:2+digits+count]
assert res[-1:] == b'\n'
return res[:-1].decode('ASCII')
[docs] @Driver.check_active
def identify(self):
return self.query('*IDN?')
[docs] @Driver.check_active
def get_channel_info(self, channel):
return self.backend.get_channel_info(self, channel)
[docs] @Driver.check_active
def get_channel_values(self, channel):
return self.backend.get_channel_values(self, channel)
[docs] @Driver.check_active
def get_screenshot(self):
png = getattr(self.backend, 'get_screenshot_png', None)
tiff = getattr(self.backend, 'get_screenshot_tiff', None)
if png:
return 'png', png(self) # pylint: disable=not-callable
if tiff:
return 'tiff', tiff(self) # pylint: disable=not-callable
raise InvalidConfigError("get_screenshot_png/_tiff not implemented")
[docs] @Driver.check_active
def get_bool(self, cmd):
return bool(int(self.query(cmd).strip()))
[docs] @Driver.check_active
def get_int(self, cmd):
return int(self.query(cmd).strip())
[docs] @Driver.check_active
def get_decimal(self, cmd):
return Decimal(self.query(cmd).strip())
[docs] @Driver.check_active
def get_str(self, cmd):
return self.query(cmd)