Source code for labgrid.tapfwd

#!/usr/bin/python3

import argparse
import asyncio
import logging
import os
import struct
import sys
from abc import ABC, abstractmethod

HEADER = struct.Struct("<H")


[docs] class Pipe(ABC):
[docs] def __init__(self, in_fd, out_fd): self.frame = b"" self.buffer = b"" self.read_event = asyncio.Event() self.write_event = asyncio.Event() self.eof = False self.loop = asyncio.get_running_loop() self.in_fd = in_fd self.out_fd = out_fd self.eof_handler = self.set_eof
def _handle_eof(self): self.eof_handler()
[docs] def set_eof(self): self.eof = True self.read_event.set() self.write_event.set()
[docs] async def stream(self): while not self.eof: self.read_event.clear() self.loop.add_reader(self.in_fd, self.read_in) while not self.frame and not self.eof: await self.read_event.wait() self.loop.remove_reader(self.in_fd) self.write_event.clear() self.loop.add_writer(self.out_fd, self.write_out) while (self.frame or self.buffer) and not self.eof: await self.write_event.wait() self.loop.remove_writer(self.out_fd)
[docs] @abstractmethod def read_in(self): raise NotImplementedError("Must be implemented in derived class")
[docs] @abstractmethod def write_out(self): raise NotImplementedError("Must be implemented in derived class")
[docs] class StreamToTapPipe(Pipe):
[docs] def read_until(self, length): while len(self.buffer) < length: d = os.read(self.in_fd, length - len(self.buffer)) if not d: raise BrokenPipeError() self.buffer += d
[docs] def read_in(self): if not self.frame: try: self.read_until(HEADER.size) hdr = HEADER.unpack_from(self.buffer) self.read_until(HEADER.size + hdr[0]) self.frame = self.buffer[HEADER.size :] self.buffer = b"" logging.debug("Read %d bytes from pipe", len(self.frame)) except BlockingIOError: return except BrokenPipeError: logging.debug("Read pipe is closed") self._handle_eof() self.read_event.set()
[docs] def write_out(self): if self.frame: try: w = os.write(self.out_fd, self.frame) if not w: raise BrokenPipeError() logging.debug("Wrote %d bytes to tap", w) self.frame = b"" except BlockingIOError: return except BrokenPipeError: logging.debug("Tap is closed") self._handle_eof() self.write_event.set()
[docs] class TapToStreamPipe(Pipe):
[docs] def read_in(self): if not self.frame: try: d = os.read(self.in_fd, 1522) if not d: raise BrokenPipeError() logging.debug("Read %d bytes from tap", len(d)) self.frame = d except BlockingIOError: return except BrokenPipeError: logging.debug("Tap is closed") self._handle_eof() self.read_event.set()
[docs] def write_out(self): if not self.buffer and self.frame: self.buffer = HEADER.pack(len(self.frame)) + self.frame self.frame = b"" while self.buffer: try: w = os.write(self.out_fd, self.buffer) if not w: raise BrokenPipeError() self.buffer = self.buffer[w:] logging.debug("Wrote %d bytes to pipe", w) except BlockingIOError: return except BrokenPipeError: logging.debug("Write pipe is closed") self._handle_eof() self.write_event.set()
[docs] async def pipe_loop(tap_fd, out_fd, in_fd): os.set_blocking(tap_fd, False) os.set_blocking(out_fd, False) os.set_blocking(in_fd, False) def handle_eof(): in_to_tap.set_eof() tap_to_out.set_eof() in_to_tap = StreamToTapPipe(in_fd, tap_fd) tap_to_out = TapToStreamPipe(tap_fd, out_fd) in_to_tap.eof_handler = handle_eof tap_to_out.eof_handler = handle_eof return await asyncio.gather(in_to_tap.stream(), tap_to_out.stream())
[docs] def main(): parser = argparse.ArgumentParser("Stream tap to stdio") parser.add_argument("fd", type=int, help="Tap FD to stream to stdio") parser.add_argument("--verbose", "-v", action="count", default=-1, help="Increase verbosity") args = parser.parse_args() if args.verbose >= 1: level = logging.DEBUG elif args.verbose >= 0: level = logging.INFO else: level = logging.WARNING root = logging.getLogger() root.setLevel(level) fmt = logging.Formatter(f"{os.getpid()} - %(asctime)s - %(name)s - %(levelname)s - %(message)s") handler = logging.StreamHandler(sys.stderr) handler.setFormatter(fmt) root.addHandler(handler) # Duplicate stdin and stdout to new file descriptors for dedicated use by # the stream. with os.fdopen(os.dup(sys.stdin.fileno())) as in_f, os.fdopen(os.dup(sys.stdout.fileno())) as out_f: # Replace stdin with devnull with open(os.devnull, "r+") as devnull: os.dup2(devnull.fileno(), sys.stdin.fileno()) # Replace stdout with stderr os.dup2(sys.stderr.fileno(), sys.stdout.fileno()) asyncio.run(pipe_loop(args.fd, out_f.fileno(), in_f.fileno()))
if __name__ == "__main__": sys.exit(main())