import contextlib
import errno
import functools
import logging
import os
import random
import socket
import string
import subprocess
import sys
import tempfile
from . import get_free_port
from .agent import py2s, s2py
[docs]
class NSSocket(socket.socket):
"""
Proxy class for sockets from a different netns.
There are certain socket operations that must be called in the network
namespace to work properly. Instead of making custom wrappers for thes in
the NetNamespace object, the NSSocket class derives from `socket.socket`,
but transparently redirects applicable socket calls into the namespace.
This relies on the namespace tracking a mirror socket file descriptor for
each created NSSocket, and when NSSocket forwards calls into the network
namespace, it performs the operation on the mirror socket. This works
because multiple file descriptors can refer to the same actual socket, even
in different namespaces.
"""
# __init__ must socket.socket() constructor, due to the way python uses it
# internally. _attach_remote_sock() can be used after the socket is created
# to attach the remote socket ID
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__sockid = None
self.__netns = None
def _attach_remote_sock(self, sockid, netns):
self.__sockid = sockid
self.__netns = netns
if sockid:
self.__forward_function("connect")
self.__forward_function("connect_ex")
self.__forward_function("bind")
return self
def __close_remote_sock(self):
if self.__sockid:
self.__netns.socket_close(self.__sockid)
self.__sockid = None
def __forward_function(self, name):
def wrap(*args, **kwargs):
return self.__netns_call(name, args, kwargs)
setattr(self, name, wrap)
def __netns_call(self, func, args, kwargs):
if not self.__sockid:
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
err, ret = self.__netns.socket_call(self.__sockid, func, py2s(args), py2s(kwargs))
if err != 0:
raise OSError(*err)
return s2py(ret)
[docs]
def close(self):
self.__close_remote_sock()
return super().close()
[docs]
def detach(self):
# NOTE: The detached file descriptor no longer forwards API to the
# namespace
self.__close_remote_sock()
return super().detach()
[docs]
def dup(self):
if self.__sockid:
ret, fd = self.__netns.socket_dup(self.__sockid)
if "error" in ret:
raise OSError(*ret["error"])
s = self.__class__(fileno=fd)._attach_remote_sock(ret["id"], self.__netns)
s.settimeout(self.gettimeout())
return s
else:
return super().dup()
[docs]
class NetNamespace:
[docs]
@classmethod
def create(cls, agentwrapper, mac_address=None):
netns = agentwrapper.load("netns")
netns.unshare()
netns.create_tun(address=mac_address)
return cls(netns)
[docs]
def __init__(self, agent):
self.logger = logging.getLogger(self.__class__.__name__)
self._agent = agent
[docs]
@functools.cached_property
def prefix(self):
return self._agent.get_prefix()
[docs]
@functools.cached_property
def intf(self):
return self._agent.get_intf()
[docs]
def ifindex(self):
links = self._agent.get_links()
for intf in links:
if intf["ifname"] == self.intf:
return intf["ifindex"]
raise KeyError(f"Interface {self.intf} not found")
[docs]
def get_links(self):
return self._agent.get_links()
def _get_cmd(self, command):
if isinstance(command, str):
return self.prefix + ["--wd=" + os.getcwd(), "--", "/bin/sh", "-c", command]
return self.prefix + ["--wd=" + os.getcwd(), "--"] + command
[docs]
def run(self, command, **kwargs):
cmd = self._get_cmd(command)
self.logger.debug("Running %s", cmd)
return subprocess.run(cmd, **kwargs)
[docs]
def Popen(self, command, **kwargs):
cmd = self._get_cmd(command)
self.logger.debug("Popen %s", cmd)
return subprocess.Popen(cmd, **kwargs)
@contextlib.contextmanager
def _create_script(self, script):
with tempfile.NamedTemporaryFile("w") as s:
s.write(script)
s.flush()
yield [sys.executable, s.name]
[docs]
def run_script(self, script, script_args=[], **kwargs):
with self._create_script(script) as command:
return self.run(command + script_args, **kwargs)
[docs]
@contextlib.contextmanager
def Popen_script(self, script, script_args=[], **kwargs):
with self._create_script(script) as command:
with self.Popen(command + script_args, **kwargs) as p:
yield p
[docs]
def socket(self, *args, **kwargs):
ret, fd = self._agent.create_socket(*args, **kwargs)
if "error" in ret:
raise OSError(*ret["error"])
return NSSocket(fileno=fd)._attach_remote_sock(ret["id"], self._agent)
[docs]
def getaddrinfo(self, *args, **kwargs):
err, result = self._agent.getaddrinfo(*args, **kwargs)
if err:
raise OSError(*err)
return result
[docs]
@contextlib.contextmanager
def socks_proxy(self):
port = get_free_port()
password = "".join(random.choice(string.ascii_letters + string.digits) for i in range(10))
with contextlib.ExitStack() as stack:
proxy = stack.enter_context(
self.Popen(
["microsocks", "-u", "labgrid", "-P", password, "-i", "127.0.0.1"],
)
)
stack.callback(lambda: proxy.terminate())
s = stack.enter_context(socket.socket())
s.bind(("127.0.0.1", port))
s.listen()
socat = stack.enter_context(
self.Popen(
["socat", f"ACCEPT-FD:{s.fileno()},fork", "TCP4:127.0.0.1:1080"],
pass_fds=(s.fileno(),),
)
)
stack.callback(lambda: socat.terminate())
stack.callback(lambda: print("A"))
s.close()
yield (port, password)