Source code for labgrid.step

import inspect
import os
import warnings
from functools import wraps
from time import monotonic


# TODO: collect events from all Steps and combine when possible, only flush
# after some time
[docs] class Steps:
[docs] def __init__(self): self._stack = [] self._subscribers = []
[docs] def get_current(self): return self._stack[-1] if self._stack else None
[docs] def get_new(self, title, tag, source, sourceinfo): step = Step(title, level=len(self._stack) + 1, tag=tag, source=source, sourceinfo=sourceinfo) # pylint: disable=redefined-outer-name return step
[docs] def push(self, step): # pylint: disable=redefined-outer-name assert step not in self._stack self._stack.append(step) step.parent = self.get_current() step.level = len(self._stack)
[docs] def pop(self, step): # pylint: disable=redefined-outer-name assert self._stack[-1] is step self._stack.pop()
[docs] def subscribe(self, callback): self._subscribers.append(callback)
[docs] def unsubscribe(self, callback): assert callback in self._subscribers self._subscribers.remove(callback)
[docs] def notify(self, event): # TODO: buffer and try to merge consecutive events for subscriber in self._subscribers: try: subscriber(event) except Exception as e: # pylint: disable=broad-except warnings.warn(f"unhandled exception during event notification: {e}")
steps = Steps()
[docs] class StepEvent:
[docs] def __init__(self, step, data, *, resource=None, stream=False): # pylint: disable=redefined-outer-name self.ts = monotonic() # used to keep track of the events age self.step = step self.data = data self.resource = resource self.stream = stream
[docs] def __str__(self): result = [self.step.title] if self.resource: result.append(self.resource.__class__.__name__) data = self.data.copy() duration = data.pop('duration', 0.0) pairs = [f"{k}={repr(v)}" for k, v in data.items() if v is not None] if duration >= 0.001: pairs.append(f"duration={duration:.3f}") result.append(", ".join(pairs)) return " ".join(result)
[docs] def __setitem__(self, k, v): self.data[k] = v
def _invalidate(self): self.ts = None self.step = None self.data = None self.resource = None self.stream = None
[docs] def merge(self, other): if not self.stream and not other.stream: return False if self.ts > other.ts: return False if self.resource is not other.resource: return False if self.data.keys() != other.data.keys(): return False for k, v in other.data: self.data[k] += v other._invalidate() return True
@property def age(self): return monotonic() - self.ts
# TODO: allow attaching log information, using a Resource as meta-data
[docs] class Step:
[docs] def __init__(self, title, level, tag, source, sourceinfo): self.title = title self.level = level self.tag = tag self.source = source self.sourceinfo = sourceinfo self.args = None self.result = None self.exception = None self._start_ts = None self._stop_ts = None self._skipped = False
[docs] def __repr__(self): result = [ f"Step(title={self.title!r}, level={self.level}, status={self.status}" ] if self.args is not None: result.append(f", args={self.args}") if self.exception is not None: result.append(f", exception={self.exception}") if self.result is not None: result.append(f", result={self.result}") duration = self.duration if duration >= 0.001: result.append(f", duration={duration:.3f}") result.append(")") return "".join(result)
@property def duration(self): if self._start_ts is None: return 0.0 if self._stop_ts is None: return monotonic() - self._start_ts return self._stop_ts - self._start_ts @property def status(self): if self._start_ts is None: return 'new' if self._stop_ts is None: return 'active' return 'done' @property def is_active(self): return self.status == 'active' @property def is_done(self): return self.status == 'done' def _notify(self, event: StepEvent): assert event.step is self steps.notify(event)
[docs] def start(self): assert self._start_ts is None self._start_ts = monotonic() steps.push(self) self._notify(StepEvent(self, { 'state': 'start', 'args': self.args, }))
[docs] def skip(self, reason): assert self._start_ts is not None self._notify(StepEvent(self, {'skip': reason}))
[docs] def stop(self): assert self._start_ts is not None assert self._stop_ts is None self._stop_ts = monotonic() event = StepEvent(self, {'state': 'stop'}) if self.exception: event['exception'] = self.exception else: event['result'] = self.result duration = self.duration if duration: event['duration'] = duration self._notify(event) steps.pop(self)
[docs] def __del__(self): if not self.is_done: warnings.warn(f"__del__ called before {step} was done")
[docs] def step(*, title=None, args=[], result=False, tag=None): def decorator(func): # resolve default title nonlocal title title = title or func.__name__ signature = inspect.signature(func) @wraps(func) def wrapper(*_args, **_kwargs): bound = signature.bind_partial(*_args, **_kwargs) bound.apply_defaults() source = func.__self__ if inspect.ismethod(func) else bound.arguments.get('self') pathname = func.__code__.co_filename sourceinfo = (pathname, os.path.basename(pathname), func.__code__.co_firstlineno) step = steps.get_new(title, tag, source, sourceinfo) # pylint: disable=redefined-outer-name # optionally pass the step object if 'step' in signature.parameters: _kwargs['step'] = step if args: step.args = {k: bound.arguments[k] for k in args} step.start() try: _result = func(*_args, **_kwargs) if result: step.result = _result except Exception as e: step.exception = e raise finally: step.stop() return _result return wrapper return decorator