xref: /freebsd/tests/atf_python/sys/net/vnet.py (revision 924226fba12cc9a228c73b956e1b7fa24c60b055)
1#!/usr/local/bin/python3
2import os
3import socket
4import time
5from ctypes import cdll
6from ctypes import get_errno
7from ctypes.util import find_library
8from typing import List
9from typing import Optional
10
11
12def run_cmd(cmd: str) -> str:
13    print("run: '{}'".format(cmd))
14    return os.popen(cmd).read()
15
16
17class VnetInterface(object):
18    INTERFACES_FNAME = "created_interfaces.lst"
19
20    # defines from net/if_types.h
21    IFT_LOOP = 0x18
22    IFT_ETHER = 0x06
23
24    def __init__(self, iface_name: str):
25        self.name = iface_name
26        self.vnet_name = ""
27        self.jailed = False
28        if iface_name.startswith("lo"):
29            self.iftype = self.IFT_LOOP
30        else:
31            self.iftype = self.IFT_ETHER
32
33    @property
34    def ifindex(self):
35        return socket.if_nametoindex(self.name)
36
37    def set_vnet(self, vnet_name: str):
38        self.vnet_name = vnet_name
39
40    def set_jailed(self, jailed: bool):
41        self.jailed = jailed
42
43    def run_cmd(self, cmd):
44        if self.vnet_name and not self.jailed:
45            cmd = "jexec {} {}".format(self.vnet_name, cmd)
46        run_cmd(cmd)
47
48    @staticmethod
49    def file_append_line(line):
50        with open(VnetInterface.INTERFACES_FNAME, "a") as f:
51            f.write(line + "\n")
52
53    @classmethod
54    def create_iface(cls, iface_name: str):
55        name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip()
56        if not name:
57            raise Exception("Unable to create iface {}".format(iface_name))
58        cls.file_append_line(name)
59        if name.startswith("epair"):
60            cls.file_append_line(name[:-1] + "b")
61        return cls(name)
62
63    @staticmethod
64    def cleanup_ifaces():
65        try:
66            with open(VnetInterface.INTERFACES_FNAME, "r") as f:
67                for line in f:
68                    run_cmd("/sbin/ifconfig {} destroy".format(line.strip()))
69            os.unlink(VnetInterface.INTERFACES_FNAME)
70        except Exception:
71            pass
72
73    def setup_addr(self, addr: str):
74        if ":" in addr:
75            family = "inet6"
76        else:
77            family = "inet"
78        cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
79        self.run_cmd(cmd)
80
81    def delete_addr(self, addr: str):
82        if ":" in addr:
83            cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr)
84        else:
85            cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr)
86        self.run_cmd(cmd)
87
88    def turn_up(self):
89        cmd = "/sbin/ifconfig {} up".format(self.name)
90        self.run_cmd(cmd)
91
92    def enable_ipv6(self):
93        cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name)
94        self.run_cmd(cmd)
95
96
97class VnetInstance(object):
98    JAILS_FNAME = "created_jails.lst"
99
100    def __init__(self, vnet_name: str, jid: int, ifaces: List[VnetInterface]):
101        self.name = vnet_name
102        self.jid = jid
103        self.ifaces = ifaces
104        for iface in ifaces:
105            iface.set_vnet(vnet_name)
106            iface.set_jailed(True)
107
108    def run_vnet_cmd(self, cmd):
109        if self.vnet_name:
110            cmd = "jexec {} {}".format(self.vnet_name, cmd)
111        return run_cmd(cmd)
112
113    @staticmethod
114    def wait_interface(vnet_name: str, iface_name: str):
115        cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name)
116        for i in range(50):
117            ifaces = run_cmd(cmd).strip().split(" ")
118            if iface_name in ifaces:
119                return True
120            time.sleep(0.1)
121        return False
122
123    @staticmethod
124    def file_append_line(line):
125        with open(VnetInstance.JAILS_FNAME, "a") as f:
126            f.write(line + "\n")
127
128    @staticmethod
129    def cleanup_vnets():
130        try:
131            with open(VnetInstance.JAILS_FNAME) as f:
132                for line in f:
133                    run_cmd("/usr/sbin/jail -r {}".format(line.strip()))
134            os.unlink(VnetInstance.JAILS_FNAME)
135        except Exception:
136            pass
137
138    @classmethod
139    def create_with_interfaces(cls, vnet_name: str, ifaces: List[VnetInterface]):
140        iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces])
141        cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format(
142            vnet_name, iface_cmds
143        )
144        jid_str = run_cmd(cmd)
145        jid = int(jid_str)
146        if jid <= 0:
147            raise Exception("Jail creation failed, output: {}".format(jid))
148        cls.file_append_line(vnet_name)
149
150        for iface in ifaces:
151            if cls.wait_interface(vnet_name, iface.name):
152                continue
153            raise Exception(
154                "Interface {} has not appeared in vnet {}".format(iface.name, vnet_name)
155            )
156        return cls(vnet_name, jid, ifaces)
157
158    @staticmethod
159    def attach_jid(jid: int):
160        _path: Optional[str] = find_library("c")
161        if _path is None:
162            raise Exception("libc not found")
163        path: str = _path
164        libc = cdll.LoadLibrary(path)
165        if libc.jail_attach(jid) != 0:
166            raise Exception("jail_attach() failed: errno {}".format(get_errno()))
167
168    def attach(self):
169        self.attach_jid(self.jid)
170
171
172class SingleVnetTestTemplate(object):
173    num_epairs = 1
174    IPV6_PREFIXES: List[str] = []
175    IPV4_PREFIXES: List[str] = []
176
177    def setup_method(self, method):
178        test_name = method.__name__
179        vnet_name = "jail_{}".format(test_name)
180        ifaces = []
181        for i in range(self.num_epairs):
182            ifaces.append(VnetInterface.create_iface("epair"))
183        self.vnet = VnetInstance.create_with_interfaces(vnet_name, ifaces)
184        self.vnet.attach()
185        for i, addr in enumerate(self.IPV6_PREFIXES):
186            if addr:
187                iface = self.vnet.ifaces[i]
188                iface.turn_up()
189                iface.enable_ipv6()
190                iface.setup_addr(addr)
191        for i, addr in enumerate(self.IPV4_PREFIXES):
192            if addr:
193                iface = self.vnet.ifaces[i]
194                iface.turn_up()
195                iface.setup_addr(addr)
196
197    def cleanup(self, nodeid: str):
198        print("==== vnet cleanup ===")
199        VnetInstance.cleanup_vnets()
200        VnetInterface.cleanup_ifaces()
201
202    def run_cmd(self, cmd: str) -> str:
203        return os.popen(cmd).read()
204