xref: /freebsd/tests/atf_python/sys/net/vnet.py (revision 058ac3e8063366dafa634d9107642e12b038bf09)
1#!/usr/local/bin/python3
2import copy
3import ipaddress
4import os
5import socket
6import sys
7import time
8from multiprocessing import Pipe
9from multiprocessing import Process
10from typing import Dict
11from typing import List
12from typing import NamedTuple
13
14from atf_python.sys.net.tools import ToolsHelper
15from atf_python.utils import libc, BaseTest
16
17
18def run_cmd(cmd: str, verbose=True) -> str:
19    print("run: '{}'".format(cmd))
20    return os.popen(cmd).read()
21
22
23def convert_test_name(test_name: str) -> str:
24    """Convert test name to a string that can be used in the file/jail names"""
25    ret = ""
26    for char in test_name:
27        if char.isalnum() or char in ("_", "-"):
28            ret += char
29        elif char in ("["):
30            ret += "_"
31    return ret
32
33
34class VnetInterface(object):
35    # defines from net/if_types.h
36    IFT_LOOP = 0x18
37    IFT_ETHER = 0x06
38
39    def __init__(self, iface_alias: str, iface_name: str):
40        self.name = iface_name
41        self.alias = iface_alias
42        self.vnet_name = ""
43        self.jailed = False
44        self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}}
45        self.prefixes4: List[List[str]] = []
46        self.prefixes6: List[List[str]] = []
47        if iface_name.startswith("lo"):
48            self.iftype = self.IFT_LOOP
49        else:
50            self.iftype = self.IFT_ETHER
51
52    @property
53    def ifindex(self):
54        return socket.if_nametoindex(self.name)
55
56    @property
57    def first_ipv6(self):
58        d = self.addr_map["inet6"]
59        return d[next(iter(d))]
60
61    @property
62    def first_ipv4(self):
63        d = self.addr_map["inet"]
64        return d[next(iter(d))]
65
66    def set_vnet(self, vnet_name: str):
67        self.vnet_name = vnet_name
68
69    def set_jailed(self, jailed: bool):
70        self.jailed = jailed
71
72    def run_cmd(
73        self,
74        cmd,
75        verbose=False,
76    ):
77        if self.vnet_name and not self.jailed:
78            cmd = "jexec {} {}".format(self.vnet_name, cmd)
79        return run_cmd(cmd, verbose)
80
81    @classmethod
82    def setup_loopback(cls, vnet_name: str):
83        lo = VnetInterface("", "lo0")
84        lo.set_vnet(vnet_name)
85        lo.turn_up()
86
87    @classmethod
88    def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]:
89        name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip()
90        if not name:
91            raise Exception("Unable to create iface {}".format(iface_name))
92        ret = [cls(alias_name, name)]
93        if name.startswith("epair"):
94            ret.append(cls(alias_name, name[:-1] + "b"))
95        return ret
96
97    def setup_addr(self, _addr: str):
98        addr = ipaddress.ip_interface(_addr)
99        if addr.version == 6:
100            family = "inet6"
101            cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
102        else:
103            family = "inet"
104            if self.addr_map[family]:
105                cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr)
106            else:
107                cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
108        self.run_cmd(cmd)
109        self.addr_map[family][str(addr.ip)] = addr
110
111    def delete_addr(self, _addr: str):
112        addr = ipaddress.ip_address(_addr)
113        if addr.version == 6:
114            family = "inet6"
115            cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr)
116        else:
117            family = "inet"
118            cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr)
119        self.run_cmd(cmd)
120        del self.addr_map[family][str(addr)]
121
122    def turn_up(self):
123        cmd = "/sbin/ifconfig {} up".format(self.name)
124        self.run_cmd(cmd)
125
126    def enable_ipv6(self):
127        cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name)
128        self.run_cmd(cmd)
129
130    def has_tentative(self) -> bool:
131        """True if an interface has some addresses in tenative state"""
132        cmd = "/sbin/ifconfig {} inet6".format(self.name)
133        out = self.run_cmd(cmd, verbose=False)
134        for line in out.splitlines():
135            if "tentative" in line:
136                return True
137        return False
138
139
140class IfaceFactory(object):
141    INTERFACES_FNAME = "created_ifaces.lst"
142
143    def __init__(self, test_name: str):
144        self.test_name = test_name
145        self.test_id = convert_test_name(test_name)
146        self.file_name = self.INTERFACES_FNAME
147
148    def _register_iface(self, iface_name: str):
149        with open(self.file_name, "a") as f:
150            f.write(iface_name + "\n")
151
152    def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]:
153        ifaces = VnetInterface.create_iface(alias_name, iface_name)
154        for iface in ifaces:
155            self._register_iface(iface.name)
156        return ifaces
157
158    def cleanup(self):
159        try:
160            with open(self.file_name, "r") as f:
161                for line in f:
162                    run_cmd("/sbin/ifconfig {} destroy".format(line.strip()))
163            os.unlink(self.INTERFACES_FNAME)
164        except Exception:
165            pass
166
167
168class VnetInstance(object):
169    def __init__(
170        self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface]
171    ):
172        self.name = vnet_name
173        self.alias = vnet_alias  # reference in the test topology
174        self.jid = jid
175        self.ifaces = ifaces
176        self.iface_alias_map = {}  # iface.alias: iface
177        self.iface_map = {}  # iface.name: iface
178        for iface in ifaces:
179            iface.set_vnet(vnet_name)
180            iface.set_jailed(True)
181            self.iface_alias_map[iface.alias] = iface
182            self.iface_map[iface.name] = iface
183        self.need_dad = False  # Disable duplicate address detection by default
184        self.attached = False
185        self.pipe = None
186        self.subprocess = None
187
188    def run_vnet_cmd(self, cmd):
189        if not self.attached:
190            cmd = "jexec {} {}".format(self.name, cmd)
191        return run_cmd(cmd)
192
193    def disable_dad(self):
194        self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0")
195
196    def set_pipe(self, pipe):
197        self.pipe = pipe
198
199    def set_subprocess(self, p):
200        self.subprocess = p
201
202    @staticmethod
203    def attach_jid(jid: int):
204        error_code = libc.jail_attach(jid)
205        if error_code != 0:
206            raise Exception("jail_attach() failed: errno {}".format(error_code))
207
208    def attach(self):
209        self.attach_jid(self.jid)
210        self.attached = True
211
212
213class VnetFactory(object):
214    JAILS_FNAME = "created_jails.lst"
215
216    def __init__(self, test_name: str):
217        self.test_name = test_name
218        self.test_id = convert_test_name(test_name)
219        self.file_name = self.JAILS_FNAME
220        self._vnets: List[str] = []
221
222    def _register_vnet(self, vnet_name: str):
223        self._vnets.append(vnet_name)
224        with open(self.file_name, "a") as f:
225            f.write(vnet_name + "\n")
226
227    @staticmethod
228    def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]:
229        cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name)
230        not_matched: List[str] = []
231        for i in range(50):
232            vnet_ifaces = run_cmd(cmd).strip().split(" ")
233            not_matched = []
234            for iface_name in ifaces:
235                if iface_name not in vnet_ifaces:
236                    not_matched.append(iface_name)
237            if len(not_matched) == 0:
238                return []
239            time.sleep(0.1)
240        return not_matched
241
242    def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]):
243        vnet_name = "jail_{}".format(self.test_id)
244        if self._vnets:
245            # add number to distinguish jails
246            vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1)
247        iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces])
248        cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format(
249            vnet_name, iface_cmds
250        )
251        jid_str = run_cmd(cmd)
252        jid = int(jid_str)
253        if jid <= 0:
254            raise Exception("Jail creation failed, output: {}".format(jid))
255        self._register_vnet(vnet_name)
256
257        # Run expedited version of routing
258        VnetInterface.setup_loopback(vnet_name)
259
260        not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces])
261        if not_found:
262            raise Exception(
263                "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name)
264            )
265        return VnetInstance(vnet_alias, vnet_name, jid, ifaces)
266
267    def cleanup(self):
268        try:
269            with open(self.file_name) as f:
270                for line in f:
271                    jail_name = line.strip()
272                    ToolsHelper.print_output(
273                        "/usr/sbin/jexec {} ifconfig -l".format(jail_name)
274                    )
275                    run_cmd("/usr/sbin/jail -r  {}".format(line.strip()))
276            os.unlink(self.JAILS_FNAME)
277        except OSError:
278            pass
279
280
281class SingleInterfaceMap(NamedTuple):
282    ifaces: List[VnetInterface]
283    vnet_aliases: List[str]
284
285
286class VnetTestTemplate(BaseTest):
287    TOPOLOGY = {}
288
289    def _get_vnet_handler(self, vnet_alias: str):
290        handler_name = "{}_handler".format(vnet_alias)
291        return getattr(self, handler_name, None)
292
293    def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe):
294        """Base Handler to setup given VNET.
295        Can be run in a subprocess. If so, passes control to the special
296        vnetX_handler() after setting up interface addresses
297        """
298        vnet.attach()
299        print("# setup_vnet({})".format(vnet.name))
300
301        topo = obj_map["topo_map"]
302        ipv6_ifaces = []
303        # Disable DAD
304        if not vnet.need_dad:
305            vnet.disable_dad()
306        for iface in vnet.ifaces:
307            # check index of vnet within an interface
308            # as we have prefixes for both ends of the interface
309            iface_map = obj_map["iface_map"][iface.alias]
310            idx = iface_map.vnet_aliases.index(vnet.alias)
311            prefixes6 = topo[iface.alias].get("prefixes6", [])
312            prefixes4 = topo[iface.alias].get("prefixes4", [])
313            if prefixes6 or prefixes4:
314                ipv6_ifaces.append(iface)
315                iface.turn_up()
316                if prefixes6:
317                    iface.enable_ipv6()
318            for prefix in prefixes6 + prefixes4:
319                iface.setup_addr(prefix[idx])
320        for iface in ipv6_ifaces:
321            while iface.has_tentative():
322                time.sleep(0.1)
323
324        # Run actual handler
325        handler = self._get_vnet_handler(vnet.alias)
326        if handler:
327            # Do unbuffered stdout for children
328            # so the logs are present if the child hangs
329            sys.stdout.reconfigure(line_buffering=True)
330            handler(vnet, obj_map, pipe)
331
332    def setup_topology(self, topo: Dict, test_name: str):
333        """Creates jails & interfaces for the provided topology"""
334        iface_map: Dict[str, SingleInterfaceMap] = {}
335        vnet_map = {}
336        iface_factory = IfaceFactory(test_name)
337        vnet_factory = VnetFactory(test_name)
338        for obj_name, obj_data in topo.items():
339            if obj_name.startswith("if"):
340                epair_ifaces = iface_factory.create_iface(obj_name, "epair")
341                smap = SingleInterfaceMap(epair_ifaces, [])
342                iface_map[obj_name] = smap
343        for obj_name, obj_data in topo.items():
344            if obj_name.startswith("vnet"):
345                vnet_ifaces = []
346                for iface_alias in obj_data["ifaces"]:
347                    # epair creates 2 interfaces, grab first _available_
348                    # and map it to the VNET being created
349                    idx = len(iface_map[iface_alias].vnet_aliases)
350                    iface_map[iface_alias].vnet_aliases.append(obj_name)
351                    vnet_ifaces.append(iface_map[iface_alias].ifaces[idx])
352                vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces)
353                vnet_map[obj_name] = vnet
354        # Debug output
355        print("============= TEST TOPOLOGY =============")
356        for vnet_alias, vnet in vnet_map.items():
357            print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="")
358            handler = self._get_vnet_handler(vnet.alias)
359            if handler:
360                print(" handler: {}".format(handler.__name__), end="")
361            print()
362        for iface_alias, iface_data in iface_map.items():
363            vnets = iface_data.vnet_aliases
364            ifaces: List[VnetInterface] = iface_data.ifaces
365            if len(vnets) == 1 and len(ifaces) == 2:
366                print(
367                    "# iface {}: {}::{} -> main::{}".format(
368                        iface_alias, vnets[0], ifaces[0].name, ifaces[1].name
369                    )
370                )
371            elif len(vnets) == 2 and len(ifaces) == 2:
372                print(
373                    "# iface {}: {}::{} -> {}::{}".format(
374                        iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name
375                    )
376                )
377            else:
378                print(
379                    "# iface {}: ifaces: {} vnets: {}".format(
380                        iface_alias, vnets, [i.name for i in ifaces]
381                    )
382                )
383        print()
384        return {"iface_map": iface_map, "vnet_map": vnet_map, "topo_map": topo}
385
386    def setup_method(self, method):
387        """Sets up all the required topology and handlers for the given test"""
388        # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)'
389        test_id = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0]
390        test_name = test_id.split("::")[-1]
391        self.check_constraints()
392        topology = self.TOPOLOGY
393        # First, setup kernel objects - interfaces & vnets
394        obj_map = self.setup_topology(topology, test_name)
395        main_vnet = None  # one without subprocess handler
396        for vnet_alias, vnet in obj_map["vnet_map"].items():
397            if self._get_vnet_handler(vnet_alias):
398                # Need subprocess to run
399                parent_pipe, child_pipe = Pipe()
400                p = Process(
401                    target=self._setup_vnet,
402                    args=(
403                        vnet,
404                        obj_map,
405                        child_pipe,
406                    ),
407                )
408                vnet.set_pipe(parent_pipe)
409                vnet.set_subprocess(p)
410                p.start()
411            else:
412                if main_vnet is not None:
413                    raise Exception("there can be only 1 VNET w/o handler")
414                main_vnet = vnet
415        # Main vnet needs to be the last, so all the other subprocesses
416        # are started & their pipe handles collected
417        self.vnet = main_vnet
418        self._setup_vnet(main_vnet, obj_map, None)
419        # Save state for the main handler
420        self.iface_map = obj_map["iface_map"]
421        self.vnet_map = obj_map["vnet_map"]
422
423    def cleanup(self, test_id: str):
424        # pytest test id: file::class::test_name
425        test_name = test_id.split("::")[-1]
426
427        print("==== vnet cleanup ===")
428        print("# test_name: '{}'".format(test_name))
429        VnetFactory(test_name).cleanup()
430        IfaceFactory(test_name).cleanup()
431
432    def wait_object(self, pipe, timeout=5):
433        if pipe.poll(timeout):
434            return pipe.recv()
435        raise TimeoutError
436
437    @property
438    def curvnet(self):
439        pass
440
441
442class SingleVnetTestTemplate(VnetTestTemplate):
443    IPV6_PREFIXES: List[str] = []
444    IPV4_PREFIXES: List[str] = []
445
446    def setup_method(self, method):
447        topology = copy.deepcopy(
448            {
449                "vnet1": {"ifaces": ["if1"]},
450                "if1": {"prefixes4": [], "prefixes6": []},
451            }
452        )
453        for prefix in self.IPV6_PREFIXES:
454            topology["if1"]["prefixes6"].append((prefix,))
455        for prefix in self.IPV4_PREFIXES:
456            topology["if1"]["prefixes4"].append((prefix,))
457        self.TOPOLOGY = topology
458        super().setup_method(method)
459