#!/usr/bin/env python
import base64
import ctypes
import fcntl
import os
import pickle
import socket
import struct
import subprocess
from pathlib import Path
CLONE_NEWNS = 0x00020000 # Mount namespace
CLONE_NEWUSER = 0x10000000 # User namespace
CLONE_NEWNET = 0x40000000 # Network namespace
libc = ctypes.CDLL("libc.so.6", use_errno=True)
libc.unshare.argtypes = [ctypes.c_int]
libc.unshare.restype = ctypes.c_int
libc.setns.argtypes = [ctypes.c_int, ctypes.c_int]
libc.setns.restype = ctypes.c_int
socket_table = {}
[docs]
def b2s(b):
return base64.b85encode(b).decode('ascii')
[docs]
def s2b(s):
return base64.b85decode(s.encode('ascii'))
[docs]
def py2s(o):
return b2s(pickle.dumps(o))
[docs]
def s2py(s):
return pickle.loads(s2b(s))
[docs]
def unshare(flags):
ret = libc.unshare(flags)
if ret != 0:
err = ctypes.get_errno()
raise OSError(err, os.strerror(err))
return ret
[docs]
def setns(fd, nstype=0):
ret = libc.setns(fd, nstype)
if ret != 0:
err = ctypes.get_errno()
raise OSError(err, os.strerror(err))
return ret
unshared = False
[docs]
def handle_unshare():
global unshared
assert not unshared
uid = os.getuid()
gid = os.getgid()
unshare(CLONE_NEWUSER)
unshare(CLONE_NEWNET)
uidmap = Path("/proc/self/uid_map")
uidmap.write_text(f"0 {uid} 1")
setgroups = Path("/proc/self/setgroups")
setgroups.write_text("deny")
gidmap = Path("/proc/self/gid_map")
gidmap.write_text(f"0 {gid} 1")
# mount again from inside the netns, so that the correct devices are visible
# subprocess.check_call(["mount", "-t", "sysfs", "sysfs", "/sys"])
# subprocess.check_call(['mount', '-t', 'devtmpfs', 'devtmpfs', '/dev'])
# subprocess.check_call(['mount', '-t', 'proc', 'proc', '/proc'])
subprocess.check_call(["ip", "link", "set", "lo", "up"])
unshared = True
return os.getpid()
[docs]
def handle_create_tun(*, address=None):
with os.fdopen(os.open("/dev/net/tun", os.O_RDWR)) as dev_tun:
TUNSETIFF = 0x400454CA
IFF_TAP = 0x0002
IFF_NO_PI = 0x1000
ifr = struct.pack("16sH22s", b"tap0", IFF_TAP | IFF_NO_PI, b"\x00" * 22)
fcntl.ioctl(dev_tun, TUNSETIFF, ifr)
subprocess.run(["ip", "link", "set", "up", "tap0"], check=True)
if address:
subprocess.run(["ip", "link", "set", "address", address, "dev", "tap0"], check=True)
return ("", os.fdopen(os.dup(dev_tun.fileno())))
[docs]
def handle_socket(*args, **kwargs):
try:
# The socket creation parameters are constant integers defined by the
# kernel. Since it's not possible to pass a file descriptor from a
# remote agent, there is no danger of these constants being out of sync
# so pass them directly instead of using strings.
s = socket.socket(*args, **kwargs)
socket_table[id(s)] = s
return ({"id": id(s)}, s.dup())
except OSError as e:
return ({"error": [e.errno, e.strerror]}, None)
[docs]
def handle_socket_close(sockid):
if sockid in socket_table:
socket_table[sockid].close()
del socket_table[sockid]
[docs]
def handle_socket_dup(sockid):
try:
s = socket_table[sockid].dup()
socket_table[id(s)] = s
return ({"id": id(s)}, s.dup())
except OSError as e:
return ({"error": [e.errno, e.strerror]}, None)
[docs]
def handle_socket_call(sockid, func, args, kwargs):
try:
s = socket_table[sockid]
ret = getattr(s, func)(*s2py(args), **s2py(kwargs))
return (0, py2s(ret))
except OSError as e:
return ([e.errno, e.strerror], None)
[docs]
def handle_list_sockets():
return list(socket_table.keys())
[docs]
def handle_get_links():
import json
output = subprocess.check_output(["ip", "-j", "link"])
return json.loads(output)
[docs]
def handle_get_prefix():
"""Returns the command prefix to use to execute commands in the namespace"""
return ["nsenter", "-t", str(os.getpid()), "-U", "-n", "-m", "--preserve-credentials"]
[docs]
def handle_get_pid():
"""Returns the PID of the namespace"""
return os.getpid()
[docs]
def handle_get_intf():
return "tap0"
[docs]
def handle_getaddrinfo(*args, **kwargs):
"""
Call getaddrinfo in the namespace.
"""
result = []
try:
for info in socket.getaddrinfo(*args, **kwargs):
result.append(info)
return (0, result)
except OSError as e:
return ((e.errno, e.strerror), result)
methods = {
"unshare": handle_unshare,
"create_tun": handle_create_tun,
"create_socket": handle_socket,
"get_links": handle_get_links,
"get_prefix": handle_get_prefix,
"get_pid": handle_get_pid,
"get_intf": handle_get_intf,
"socket_call": handle_socket_call,
"socket_close": handle_socket_close,
"socket_dup": handle_socket_dup,
"list_sockets": handle_list_sockets,
"getaddrinfo": handle_getaddrinfo,
}