#!/usr/bin/python3
import argparse
import asyncio
import logging
import os
import struct
import sys
from abc import ABC, abstractmethod
HEADER = struct.Struct("<H")
[docs]
class Pipe(ABC):
[docs]
def __init__(self, in_fd, out_fd):
self.frame = b""
self.buffer = b""
self.read_event = asyncio.Event()
self.write_event = asyncio.Event()
self.eof = False
self.loop = asyncio.get_running_loop()
self.in_fd = in_fd
self.out_fd = out_fd
self.eof_handler = self.set_eof
def _handle_eof(self):
self.eof_handler()
[docs]
def set_eof(self):
self.eof = True
self.read_event.set()
self.write_event.set()
[docs]
async def stream(self):
while not self.eof:
self.read_event.clear()
self.loop.add_reader(self.in_fd, self.read_in)
while not self.frame and not self.eof:
await self.read_event.wait()
self.loop.remove_reader(self.in_fd)
self.write_event.clear()
self.loop.add_writer(self.out_fd, self.write_out)
while (self.frame or self.buffer) and not self.eof:
await self.write_event.wait()
self.loop.remove_writer(self.out_fd)
[docs]
@abstractmethod
def read_in(self):
raise NotImplementedError("Must be implemented in derived class")
[docs]
@abstractmethod
def write_out(self):
raise NotImplementedError("Must be implemented in derived class")
[docs]
class StreamToTapPipe(Pipe):
[docs]
def read_until(self, length):
while len(self.buffer) < length:
d = os.read(self.in_fd, length - len(self.buffer))
if not d:
raise BrokenPipeError()
self.buffer += d
[docs]
def read_in(self):
if not self.frame:
try:
self.read_until(HEADER.size)
hdr = HEADER.unpack_from(self.buffer)
self.read_until(HEADER.size + hdr[0])
self.frame = self.buffer[HEADER.size :]
self.buffer = b""
logging.debug("Read %d bytes from pipe", len(self.frame))
except BlockingIOError:
return
except BrokenPipeError:
logging.debug("Read pipe is closed")
self._handle_eof()
self.read_event.set()
[docs]
def write_out(self):
if self.frame:
try:
w = os.write(self.out_fd, self.frame)
if not w:
raise BrokenPipeError()
logging.debug("Wrote %d bytes to tap", w)
self.frame = b""
except BlockingIOError:
return
except BrokenPipeError:
logging.debug("Tap is closed")
self._handle_eof()
self.write_event.set()
[docs]
class TapToStreamPipe(Pipe):
[docs]
def read_in(self):
if not self.frame:
try:
d = os.read(self.in_fd, 1522)
if not d:
raise BrokenPipeError()
logging.debug("Read %d bytes from tap", len(d))
self.frame = d
except BlockingIOError:
return
except BrokenPipeError:
logging.debug("Tap is closed")
self._handle_eof()
self.read_event.set()
[docs]
def write_out(self):
if not self.buffer and self.frame:
self.buffer = HEADER.pack(len(self.frame)) + self.frame
self.frame = b""
while self.buffer:
try:
w = os.write(self.out_fd, self.buffer)
if not w:
raise BrokenPipeError()
self.buffer = self.buffer[w:]
logging.debug("Wrote %d bytes to pipe", w)
except BlockingIOError:
return
except BrokenPipeError:
logging.debug("Write pipe is closed")
self._handle_eof()
self.write_event.set()
[docs]
async def pipe_loop(tap_fd, out_fd, in_fd):
os.set_blocking(tap_fd, False)
os.set_blocking(out_fd, False)
os.set_blocking(in_fd, False)
def handle_eof():
in_to_tap.set_eof()
tap_to_out.set_eof()
in_to_tap = StreamToTapPipe(in_fd, tap_fd)
tap_to_out = TapToStreamPipe(tap_fd, out_fd)
in_to_tap.eof_handler = handle_eof
tap_to_out.eof_handler = handle_eof
return await asyncio.gather(in_to_tap.stream(), tap_to_out.stream())
[docs]
def main():
parser = argparse.ArgumentParser("Stream tap to stdio")
parser.add_argument("fd", type=int, help="Tap FD to stream to stdio")
parser.add_argument("--verbose", "-v", action="count", default=-1, help="Increase verbosity")
args = parser.parse_args()
if args.verbose >= 1:
level = logging.DEBUG
elif args.verbose >= 0:
level = logging.INFO
else:
level = logging.WARNING
root = logging.getLogger()
root.setLevel(level)
fmt = logging.Formatter(f"{os.getpid()} - %(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(fmt)
root.addHandler(handler)
# Duplicate stdin and stdout to new file descriptors for dedicated use by
# the stream.
with os.fdopen(os.dup(sys.stdin.fileno())) as in_f, os.fdopen(os.dup(sys.stdout.fileno())) as out_f:
# Replace stdin with devnull
with open(os.devnull, "r+") as devnull:
os.dup2(devnull.fileno(), sys.stdin.fileno())
# Replace stdout with stderr
os.dup2(sys.stderr.fileno(), sys.stdout.fileno())
asyncio.run(pipe_loop(args.fd, out_f.fileno(), in_f.fileno()))
if __name__ == "__main__":
sys.exit(main())