import fcntl
import os
import logging
import pty
import re
import select
import subprocess
import errno
from socket import socket, AF_INET, SOCK_STREAM
from contextlib import closing
import attr
from ..step import step
re_vt100 = re.compile(r"(\x1b\[|\x9b)[^@-_a-z]*[@-_a-z]|\x1b[@-_a-z]")
[docs]
def get_free_port():
"""Helper function to always return an unused port."""
with closing(socket(AF_INET, SOCK_STREAM)) as s:
s.bind(('', 0))
return s.getsockname()[1]
[docs]
def get_user():
"""Get the username of the current user."""
user = os.environ.get("USER")
if user:
return user
import pwd
return pwd.getpwuid(os.getuid())[0]
[docs]
def set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
[docs]
@attr.s
class ProcessWrapper:
callbacks = attr.ib(default=attr.Factory(list))
loglevel = logging.INFO
[docs]
@step(args=['command'], result=True, tag='process')
def check_output(self, command, *, print_on_silent_log=False, input=None, stdin=None): # pylint: disable=redefined-builtin
"""Run a command and supply the output to callback functions"""
logger = logging.getLogger("Process")
res = []
mfd, sfd = pty.openpty()
set_nonblocking(mfd)
kwargs = {}
stdin_r = None
stdin_w = None
if input is not None:
stdin_r, stdin_w = os.pipe()
kwargs['stdin'] = stdin_r
set_nonblocking(stdin_w)
elif stdin is not None:
kwargs['stdin'] = stdin
process = subprocess.Popen(command, stderr=sfd,
stdout=sfd, bufsize=0, **kwargs)
logger.log(ProcessWrapper.loglevel, "[%d] command: %s", process.pid, " ".join(command))
# do not register/unregister already registered print_callback
if ProcessWrapper.print_callback in self.callbacks:
print_on_silent_log = False
if print_on_silent_log and logger.getEffectiveLevel() > ProcessWrapper.loglevel:
self.enable_print()
if stdin_r is not None:
os.close(stdin_r)
# close sfd so we notice when the child is gone
os.close(sfd)
# get a file object from the fd
buf = b""
read_fds = [mfd]
write_fds = []
if stdin_w is not None:
write_fds.append(stdin_w)
while True:
ready_r, ready_w, _ = select.select(read_fds, write_fds, [], 0.1)
if mfd in ready_r:
raw = None
try:
raw = os.read(mfd, 4096)
except BlockingIOError:
pass
except OSError as e:
if e.errno == errno.EIO:
break
raise
if raw:
buf += raw
*parts, buf = buf.split(b'\r')
res.extend(parts)
for part in parts:
for callback in self.callbacks:
callback(part, process)
if stdin_w in ready_w:
if input:
amt = 0
try:
amt = os.write(stdin_w, input)
except BlockingIOError:
pass
except OSError as e:
if e.errno == errno.EIO:
break
raise
input = input[amt:]
if not input:
write_fds.remove(stdin_w)
os.close(stdin_w)
stdin_w = None
process.poll()
if process.returncode is not None:
break
if stdin_w is not None:
os.close(stdin_w)
os.close(mfd)
process.wait()
if buf:
# process incomplete line
res.append(buf)
if buf[-1] != b'\n':
buf += b'\n'
for callback in self.callbacks:
callback(buf, process)
if print_on_silent_log and logger.getEffectiveLevel() > ProcessWrapper.loglevel:
self.disable_print()
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode,
command,
output=b'\r'.join(res))
# this converts '\r\n' to '\n' to be more compatible to the behaviour
# of the normal subprocess module
return b'\n'.join([r.strip(b'\n') for r in res])
[docs]
def register(self, callback):
"""Register a callback with the ProcessWrapper"""
if callback in self.callbacks:
return
self.callbacks.append(callback)
[docs]
def unregister(self, callback):
"""Unregister a callback with the ProcessWrapper"""
if callback not in self.callbacks:
return
self.callbacks.remove(callback)
[docs]
@staticmethod
def log_callback(message, process):
"""Logs process output message along with its pid."""
logger = logging.getLogger("Process")
message = message.decode(encoding="utf-8", errors="replace").strip("\n")
if message:
logger.log(ProcessWrapper.loglevel, "[%d] %s", process.pid, message)
[docs]
@staticmethod
def print_callback(message, _):
"""Prints process output message."""
message = message.decode(encoding="utf-8", errors="replace")
print(f"\r{message}", end='')
[docs]
def enable_logging(self):
"""Enables process output to the logging interface."""
self.register(ProcessWrapper.log_callback)
[docs]
def disable_logging(self):
"""Disables process output logging."""
self.unregister(ProcessWrapper.log_callback)
[docs]
def enable_print(self):
"""Enables process output to print."""
self.register(ProcessWrapper.print_callback)
[docs]
def disable_print(self):
"""Disables process output printing."""
self.unregister(ProcessWrapper.print_callback)
processwrapper = ProcessWrapper()