import warnings
import inspect
from functools import wraps
from time import monotonic
# TODO: collect events from all Steps and combine when possible, only flush
# after some time
[docs]class Steps:
[docs] def __init__(self):
self._stack = []
self._subscribers = []
[docs] def get_current(self):
return self._stack[-1] if self._stack else None
[docs] def get_new(self, title, tag, source):
step = Step(title, level=len(self._stack) + 1, tag=tag, source=source)
return step
[docs] def push(self, step):
assert step not in self._stack
self._stack.append(step)
step.parent = self.get_current()
step.level = len(self._stack)
[docs] def pop(self, step):
assert self._stack[-1] is step
self._stack.pop()
[docs] def subscribe(self, callback):
self._subscribers.append(callback)
[docs] def unsubscribe(self, callback):
assert callback in self._subscribers
self._subscribers.remove(callback)
[docs] def notify(self, event):
# TODO: buffer and try to merge consecutive events
for subscriber in self._subscribers:
try:
subscriber(event)
except Exception as e: # pylint: disable=broad-except
warnings.warn("unhandled exception during event notification: {}".format(e))
steps = Steps()
[docs]class StepEvent:
[docs] def __init__(self, step, data, *, resource=None, stream=False):
self.ts = monotonic() # used to keep track of the events age
self.step = step
self.data = data
self.resource = resource
self.stream = stream
[docs] def __str__(self):
result = [str(self.step)]
if self.resource:
result.append(self.resource.__class__.__name__)
data = self.data.copy()
duration = data.pop('duration', 0.0)
pairs = ["{}={}".format(k, repr(v)) for k, v in data.items() if v is not None]
if duration >= 0.001:
pairs.append("duration={:.3f}".format(duration))
result.append(", ".join(pairs))
return " ".join(result)
[docs] def __setitem__(self, k, v):
self.data[k] = v
def _invalidate(self):
self.ts = None
self.step = None
self.data = None
self.resource = None
self.stream = None
[docs] def merge(self, other):
if not self.stream and not other.stream:
return False
if self.ts > other.ts:
return False
if self.resource is not other.resource:
return False
if self.data.keys() != other.data.keys():
return False
for k, v in other.data:
self.data[k] += v
other._invalidate()
return True
@property
def age(self):
return monotonic() - self.ts
# TODO: allow attaching log information, using a Resource as meta-data
[docs]class Step:
[docs] def __init__(self, title, level, tag, source):
self.title = title
self.level = level
self.source = source
self.tag = tag
self.args = None
self.result = None
self.exception = None
self._start_ts = None
self._stop_ts = None
self._skipped = False
[docs] def __repr__(self):
result = [
"Step(title={!r}, level={}, status={}".format(
self.title,
self.level,
self.status,
)
]
if self.args is not None:
result.append(", args={}".format(self.args))
if self.exception is not None:
result.append(", exception={}".format(self.exception))
if self.result is not None:
result.append(", result={}".format(self.result))
duration = self.duration
if duration >= 0.001:
result.append(", duration={:.3f}".format(duration))
result.append(")")
return "".join(result)
[docs] def __str__(self):
return "{}".format(self.title)
@property
def duration(self):
if self._start_ts is None:
return 0.0
if self._stop_ts is None:
return monotonic() - self._start_ts
return self._stop_ts - self._start_ts
@property
def status(self):
if self._start_ts is None:
return 'new'
if self._stop_ts is None:
return 'active'
return 'done'
@property
def is_active(self):
return self.status == 'active'
@property
def is_done(self):
return self.status == 'done'
def _notify(self, event: StepEvent):
assert event.step is self
steps.notify(event)
[docs] def start(self):
assert self._start_ts is None
self._start_ts = monotonic()
steps.push(self)
self._notify(StepEvent(self, {
'state': 'start',
'args': self.args,
}))
[docs] def skip(self, reason):
assert self._start_ts is not None
self._notify(StepEvent(self, {'skip': reason}))
[docs] def stop(self):
assert self._start_ts is not None
assert self._stop_ts is None
self._stop_ts = monotonic()
event = StepEvent(self, {'state': 'stop'})
if self.exception:
event['exception'] = self.exception
else:
event['result'] = self.result
duration = self.duration
if duration:
event['duration'] = duration
self._notify(event)
steps.pop(self)
[docs] def __del__(self):
if not self.is_done:
warnings.warn("__del__ called before {} was done".format(step))
[docs]def step(*, title=None, args=[], result=False, tag=None): # pylint: disable=unused-argument
def decorator(func):
# resolve default title
nonlocal title
title = title or func.__name__
signature = inspect.signature(func)
@wraps(func)
def wrapper(*_args, **_kwargs):
bound = signature.bind_partial(*_args, **_kwargs)
bound.apply_defaults()
source = func.__self__ if inspect.ismethod(func) else bound.arguments.get('self')
step = steps.get_new(title, tag, source)
# optionally pass the step object
if 'step' in signature.parameters:
_kwargs['step'] = step
if args:
step.args = {k: bound.arguments[k] for k in args}
step.start()
try:
_result = func(*_args, **_kwargs)
if result:
step.result = _result
except Exception as e:
step.exception = e
raise
finally:
step.stop()
return _result
return wrapper
return decorator