Source code for labgrid.util.agents.netns

#!/usr/bin/env python

import base64
import ctypes
import fcntl
import os
import pickle
import socket
import struct
import subprocess
from pathlib import Path

CLONE_NEWNS = 0x00020000  # Mount namespace
CLONE_NEWUSER = 0x10000000  # User namespace
CLONE_NEWNET = 0x40000000  # Network namespace

libc = ctypes.CDLL("libc.so.6", use_errno=True)

libc.unshare.argtypes = [ctypes.c_int]
libc.unshare.restype = ctypes.c_int

libc.setns.argtypes = [ctypes.c_int, ctypes.c_int]
libc.setns.restype = ctypes.c_int

socket_table = {}


[docs] def b2s(b): return base64.b85encode(b).decode('ascii')
[docs] def s2b(s): return base64.b85decode(s.encode('ascii'))
[docs] def py2s(o): return b2s(pickle.dumps(o))
[docs] def s2py(s): return pickle.loads(s2b(s))
[docs] def unshare(flags): ret = libc.unshare(flags) if ret != 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err)) return ret
[docs] def setns(fd, nstype=0): ret = libc.setns(fd, nstype) if ret != 0: err = ctypes.get_errno() raise OSError(err, os.strerror(err)) return ret
unshared = False
[docs] def handle_unshare(): global unshared assert not unshared uid = os.getuid() gid = os.getgid() unshare(CLONE_NEWUSER) unshare(CLONE_NEWNET) uidmap = Path("/proc/self/uid_map") uidmap.write_text(f"0 {uid} 1") setgroups = Path("/proc/self/setgroups") setgroups.write_text("deny") gidmap = Path("/proc/self/gid_map") gidmap.write_text(f"0 {gid} 1") # mount again from inside the netns, so that the correct devices are visible # subprocess.check_call(["mount", "-t", "sysfs", "sysfs", "/sys"]) # subprocess.check_call(['mount', '-t', 'devtmpfs', 'devtmpfs', '/dev']) # subprocess.check_call(['mount', '-t', 'proc', 'proc', '/proc']) subprocess.check_call(["ip", "link", "set", "lo", "up"]) unshared = True return os.getpid()
[docs] def handle_create_tun(*, address=None): with os.fdopen(os.open("/dev/net/tun", os.O_RDWR)) as dev_tun: TUNSETIFF = 0x400454CA IFF_TAP = 0x0002 IFF_NO_PI = 0x1000 ifr = struct.pack("16sH22s", b"tap0", IFF_TAP | IFF_NO_PI, b"\x00" * 22) fcntl.ioctl(dev_tun, TUNSETIFF, ifr) subprocess.run(["ip", "link", "set", "up", "tap0"], check=True) if address: subprocess.run(["ip", "link", "set", "address", address, "dev", "tap0"], check=True) return ("", os.fdopen(os.dup(dev_tun.fileno())))
[docs] def handle_socket(*args, **kwargs): try: # The socket creation parameters are constant integers defined by the # kernel. Since it's not possible to pass a file descriptor from a # remote agent, there is no danger of these constants being out of sync # so pass them directly instead of using strings. s = socket.socket(*args, **kwargs) socket_table[id(s)] = s return ({"id": id(s)}, s.dup()) except OSError as e: return ({"error": [e.errno, e.strerror]}, None)
[docs] def handle_socket_close(sockid): if sockid in socket_table: socket_table[sockid].close() del socket_table[sockid]
[docs] def handle_socket_dup(sockid): try: s = socket_table[sockid].dup() socket_table[id(s)] = s return ({"id": id(s)}, s.dup()) except OSError as e: return ({"error": [e.errno, e.strerror]}, None)
[docs] def handle_socket_call(sockid, func, args, kwargs): try: s = socket_table[sockid] ret = getattr(s, func)(*s2py(args), **s2py(kwargs)) return (0, py2s(ret)) except OSError as e: return ([e.errno, e.strerror], None)
[docs] def handle_list_sockets(): return list(socket_table.keys())
[docs] def handle_get_prefix(): """Returns the command prefix to use to execute commands in the namespace""" return ["nsenter", "-t", str(os.getpid()), "-U", "-n", "-m", "--preserve-credentials"]
[docs] def handle_get_pid(): """Returns the PID of the namespace""" return os.getpid()
[docs] def handle_get_intf(): return "tap0"
[docs] def handle_getaddrinfo(*args, **kwargs): """ Call getaddrinfo in the namespace. """ result = [] try: for info in socket.getaddrinfo(*args, **kwargs): result.append(info) return (0, result) except OSError as e: return ((e.errno, e.strerror), result)
methods = { "unshare": handle_unshare, "create_tun": handle_create_tun, "create_socket": handle_socket, "get_links": handle_get_links, "get_prefix": handle_get_prefix, "get_pid": handle_get_pid, "get_intf": handle_get_intf, "socket_call": handle_socket_call, "socket_close": handle_socket_close, "socket_dup": handle_socket_dup, "list_sockets": handle_list_sockets, "getaddrinfo": handle_getaddrinfo, }