import warnings
import inspect
from functools import wraps
from time import monotonic
# TODO: collect events from all Steps and combine when possible, only flush
# after some time
[docs]class Steps:
def __init__(self):
self._stack = []
self._subscribers = []
[docs] def get_current(self):
return self._stack[-1] if self._stack else None
[docs] def get_new(self, title):
step = Step(title, level=len(self._stack) + 1)
return step
[docs] def push(self, step):
assert step not in self._stack
self._stack.append(step)
step.parent = self.get_current()
step.level = len(self._stack)
[docs] def pop(self, step):
assert self._stack[-1] is step
self._stack.pop()
[docs] def subscribe(self, callback):
self._subscribers.append(callback)
[docs] def notify(self, event):
# TODO: buffer and try to merge consecutive events
for subscriber in self._subscribers:
subscriber(event)
steps = Steps()
[docs]class StepEvent:
def __init__(self, step, data, *, resource=None, stream=False):
self.ts = monotonic() # used to keep track of the events age
self.step = step
self.data = data
self.resource = resource
self.stream = stream
def __str__(self):
result = [str(self.step)]
if self.resource:
result.append(self.resource.__class__.__name__)
result.append(", ".join(
"{}={}".format(k, repr(v)) for k, v in self.data.items() if v is not None
))
return " ".join(result)
def _invalidate(self):
self.ts = None
self.step = None
self.data = None
self.resource = resource
self.stream = None
[docs] def merge(self, other):
if not self.stream and not other.stream:
return False
if self.ts > other.ts:
return False
if self.resource is not other.resource:
return False
if self.data.keys() != other.data.keys():
return False
for k, v in other.data:
self.data[k] += v
other._invalidate()
return True
@property
def age(self):
return monotonic() - self.ts
# TODO: allow attaching log information, using a Resource as meta-data
[docs]class Step:
def __init__(self, title, level):
self.title = title
self.level = level
self.args = None
self.result = None
self._start_ts = None
self._stop_ts = None
self._skipped = False
def __repr__(self):
result = [
"Step(title={!r}, level={}, status={}".format(
self.title,
self.level,
self.status,
)
]
if self.args is not None:
result.append(", args={}".format(self.args))
if self.result is not None:
result.append(", result={}".format(self.result))
duration = self.duration
if duration >= 0.001:
result.append(", duration={:.3f}".format(duration))
result.append(")")
return "".join(result)
def __str__(self):
return "{}".format(self.title)
@property
def duration(self):
if self._start_ts is None:
return 0.0
elif self._stop_ts is None:
return monotonic() - self._start_ts
else:
return self._stop_ts - self._start_ts
@property
def status(self):
if self._start_ts is None:
return 'new'
elif self._stop_ts is None:
return 'active'
else:
return 'done'
@property
def is_active(self):
return self.status == 'active'
@property
def is_done(self):
return self.status == 'done'
def _notify(self, event: StepEvent):
assert event.step is self
steps.notify(event)
[docs] def start(self):
assert self._start_ts is None
self._start_ts = monotonic()
steps.push(self)
self._notify(StepEvent(self, {
'state': 'start',
'args': self.args,
}))
[docs] def skip(self, reason):
assert self._start_ts is not None
self._notify(StepEvent(self, {'skip': reason}))
[docs] def stop(self):
assert self._start_ts is not None
assert self._stop_ts is None
self._stop_ts = monotonic()
# TODO: report duration
self._notify(StepEvent(self, {
'state': 'stop',
'result': self.result,
}))
steps.pop(self)
def __del__(self):
if not self.is_done:
warnings.warn("__del__ called before {} was done".format(step))
[docs]def step(*, title=None, args=[], result=False):
def decorator(func):
@wraps(func)
def wrapper(*_args, **_kwargs):
if title is None:
step = steps.get_new(func.__name__)
else:
step = steps.get_new(title)
# optionally pass the step object
if 'step' in inspect.signature(func).parameters:
_kwargs['step'] = step
if args:
captured = inspect.getcallargs(func, *_args, **_kwargs)
step.args = {k: captured[k] for k in args}
step.start()
try:
_result = func(*_args, **_kwargs)
if result:
step.result = _result
finally:
step.stop()
return _result
return wrapper
return decorator