Source code for labgrid.util.netns

import contextlib
import errno
import functools
import logging
import os
import random
import socket
import string
import subprocess
import sys
import tempfile

from . import get_free_port
from .agent import py2s, s2py


[docs] class NSSocket(socket.socket): """ Proxy class for sockets from a different netns. There are certain socket operations that must be called in the network namespace to work properly. Instead of making custom wrappers for thes in the NetNamespace object, the NSSocket class derives from `socket.socket`, but transparently redirects applicable socket calls into the namespace. This relies on the namespace tracking a mirror socket file descriptor for each created NSSocket, and when NSSocket forwards calls into the network namespace, it performs the operation on the mirror socket. This works because multiple file descriptors can refer to the same actual socket, even in different namespaces. """ # __init__ must socket.socket() constructor, due to the way python uses it # internally. _attach_remote_sock() can be used after the socket is created # to attach the remote socket ID
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__sockid = None self.__netns = None
def _attach_remote_sock(self, sockid, netns): self.__sockid = sockid self.__netns = netns if sockid: self.__forward_function("connect") self.__forward_function("connect_ex") self.__forward_function("bind") return self def __close_remote_sock(self): if self.__sockid: self.__netns.socket_close(self.__sockid) self.__sockid = None def __forward_function(self, name): def wrap(*args, **kwargs): return self.__netns_call(name, args, kwargs) setattr(self, name, wrap) def __netns_call(self, func, args, kwargs): if not self.__sockid: raise OSError(errno.EBADF, os.strerror(errno.EBADF)) err, ret = self.__netns.socket_call(self.__sockid, func, py2s(args), py2s(kwargs)) if err != 0: raise OSError(*err) return s2py(ret)
[docs] def close(self): self.__close_remote_sock() return super().close()
[docs] def detach(self): # NOTE: The detached file descriptor no longer forwards API to the # namespace self.__close_remote_sock() return super().detach()
[docs] def dup(self): if self.__sockid: ret, fd = self.__netns.socket_dup(self.__sockid) if "error" in ret: raise OSError(*ret["error"]) s = self.__class__(fileno=fd)._attach_remote_sock(ret["id"], self.__netns) s.settimeout(self.gettimeout()) return s else: return super().dup()
[docs] class NetNamespace:
[docs] @classmethod def create(cls, agentwrapper, mac_address=None): netns = agentwrapper.load("netns") netns.unshare() netns.create_tun(address=mac_address) return cls(netns)
[docs] def __init__(self, agent): self.logger = logging.getLogger(self.__class__.__name__) self._agent = agent
[docs] @functools.cached_property def prefix(self): return self._agent.get_prefix()
[docs] @functools.cached_property def intf(self): return self._agent.get_intf()
[docs] def ifindex(self): links = self._agent.get_links() for intf in links: if intf["ifname"] == self.intf: return intf["ifindex"] raise KeyError(f"Interface {self.intf} not found")
def _get_cmd(self, command): if isinstance(command, str): return self.prefix + ["--wd=" + os.getcwd(), "--", "/bin/sh", "-c", command] return self.prefix + ["--wd=" + os.getcwd(), "--"] + command
[docs] def run(self, command, **kwargs): cmd = self._get_cmd(command) self.logger.debug("Running %s", cmd) return subprocess.run(cmd, **kwargs)
[docs] def Popen(self, command, **kwargs): cmd = self._get_cmd(command) self.logger.debug("Popen %s", cmd) return subprocess.Popen(cmd, **kwargs)
@contextlib.contextmanager def _create_script(self, script): with tempfile.NamedTemporaryFile("w") as s: s.write(script) s.flush() yield [sys.executable, s.name]
[docs] def run_script(self, script, script_args=[], **kwargs): with self._create_script(script) as command: return self.run(command + script_args, **kwargs)
[docs] @contextlib.contextmanager def Popen_script(self, script, script_args=[], **kwargs): with self._create_script(script) as command: with self.Popen(command + script_args, **kwargs) as p: yield p
[docs] def socket(self, *args, **kwargs): ret, fd = self._agent.create_socket(*args, **kwargs) if "error" in ret: raise OSError(*ret["error"]) return NSSocket(fileno=fd)._attach_remote_sock(ret["id"], self._agent)
[docs] def getaddrinfo(self, *args, **kwargs): err, result = self._agent.getaddrinfo(*args, **kwargs) if err: raise OSError(*err) return result
[docs] @contextlib.contextmanager def socks_proxy(self): port = get_free_port() password = "".join(random.choice(string.ascii_letters + string.digits) for i in range(10)) with contextlib.ExitStack() as stack: proxy = stack.enter_context( self.Popen( ["microsocks", "-u", "labgrid", "-P", password, "-i", "127.0.0.1"], ) ) stack.callback(lambda: proxy.terminate()) s = stack.enter_context(socket.socket()) s.bind(("127.0.0.1", port)) s.listen() socat = stack.enter_context( self.Popen( ["socat", f"ACCEPT-FD:{s.fileno()},fork", "TCP4:127.0.0.1:1080"], pass_fds=(s.fileno(),), ) ) stack.callback(lambda: socat.terminate()) stack.callback(lambda: print("A")) s.close() yield (port, password)