import fcntl
import os
import logging
import pty
import select
import subprocess
from socket import socket, AF_INET, SOCK_STREAM
from contextlib import closing
import attr
from ..step import step
[docs]def get_free_port():
"""Helper function to always return an unused port."""
with closing(socket(AF_INET, SOCK_STREAM)) as s:
s.bind(('', 0))
return s.getsockname()[1]
[docs]def get_user():
"""Get the username of the current user."""
user = os.environ.get("USER")
if user:
return user
import pwd
return pwd.getpwuid(os.getuid())[0]
[docs]@attr.s
class ProcessWrapper:
callbacks = attr.ib(default=attr.Factory(list))
loglevel = logging.INFO
[docs] @step(args=['command'], result=True, tag='process')
def check_output(self, command, *, print_on_silent_log=False):
"""Run a command and supply the output to callback functions"""
logger = logging.getLogger("Process")
res = []
mfd, sfd = pty.openpty()
flags = fcntl.fcntl(mfd, fcntl.F_GETFL)
fcntl.fcntl(mfd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
process = subprocess.Popen(command, stderr=sfd,
stdout=sfd, bufsize=0)
logger.log(ProcessWrapper.loglevel, "[%d] command: %s", process.pid, " ".join(command))
# do not register/unregister already registered print_callback
if ProcessWrapper.print_callback in self.callbacks:
print_on_silent_log = False
if print_on_silent_log and logger.getEffectiveLevel() > ProcessWrapper.loglevel:
self.enable_print()
# close sfd so we notice when the child is gone
os.close(sfd)
# get a file object from the fd
buf = b""
while True:
try:
raw = os.read(mfd, 4096)
except BlockingIOError as ex:
# wait for new data and retry
select.select([mfd], [], [mfd], 0.1)
continue
except OSError as e:
if e.errno == 5:
break
raise
if raw:
buf += raw
*parts, buf = buf.split(b'\r')
res.extend(parts)
for part in parts:
for callback in self.callbacks:
callback(part, process)
process.poll()
if process.returncode is not None:
break
os.close(mfd)
process.wait()
if buf:
# process incomplete line
res.append(buf)
if buf[-1] != b'\n':
buf += b'\n'
for callback in self.callbacks:
callback(buf, process)
if print_on_silent_log and logger.getEffectiveLevel() > ProcessWrapper.loglevel:
self.disable_print()
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode,
command,
output=b'\r'.join(res))
# this converts '\r\n' to '\n' to be more compatible to the behaviour
# of the normal subprocess module
return b'\n'.join([r.strip(b'\n') for r in res])
[docs] def register(self, callback):
"""Register a callback with the ProcessWrapper"""
if callback in self.callbacks:
return
self.callbacks.append(callback)
[docs] def unregister(self, callback):
"""Unregister a callback with the ProcessWrapper"""
if callback not in self.callbacks:
return
self.callbacks.remove(callback)
[docs] @staticmethod
def log_callback(message, process):
"""Logs process output message along with its pid."""
logger = logging.getLogger("Process")
message = message.decode(encoding="utf-8", errors="replace").strip("\n")
if message:
logger.log(ProcessWrapper.loglevel, "[%d] %s", process.pid, message)
[docs] @staticmethod
def print_callback(message, _):
"""Prints process output message."""
message = message.decode(encoding="utf-8", errors="replace")
print(f"\r{message}", end='')
[docs] def enable_logging(self):
"""Enables process output to the logging interface."""
self.register(ProcessWrapper.log_callback)
[docs] def disable_logging(self):
"""Disables process output logging."""
self.unregister(ProcessWrapper.log_callback)
[docs] def enable_print(self):
"""Enables process output to print."""
self.register(ProcessWrapper.print_callback)
[docs] def disable_print(self):
"""Disables process output printing."""
self.unregister(ProcessWrapper.print_callback)
processwrapper = ProcessWrapper()