xref: /freebsd/tests/atf_python/sys/net/vnet.py (revision c0a4a7bb942fd3302f0093e4353820916d3661d1)
1#!/usr/local/bin/python3
2import copy
3import ipaddress
4import re
5import os
6import socket
7import sys
8import time
9from multiprocessing import Pipe
10from multiprocessing import Process
11from typing import Dict
12from typing import List
13from typing import NamedTuple
14
15from atf_python.sys.net.tools import ToolsHelper
16from atf_python.utils import BaseTest
17from atf_python.utils import libc
18
19
20def run_cmd(cmd: str, verbose=True) -> str:
21    print("run: '{}'".format(cmd))
22    return os.popen(cmd).read()
23
24
25def get_topology_id(test_id: str) -> str:
26    """
27    Gets a unique topology id based on the pytest test_id.
28      "test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif]" ->
29      "TestIP6Output:test_output6_pktinfo[ipandif]"
30    """
31    return ":".join(test_id.split("::")[-2:])
32
33
34def convert_test_name(test_name: str) -> str:
35    """Convert test name to a string that can be used in the file/jail names"""
36    ret = ""
37    for char in test_name:
38        if char.isalnum() or char in ("_", "-", ":"):
39            ret += char
40        elif char in ("["):
41            ret += "_"
42    return ret
43
44
45class VnetInterface(object):
46    # defines from net/if_types.h
47    IFT_LOOP = 0x18
48    IFT_ETHER = 0x06
49
50    def __init__(self, iface_alias: str, iface_name: str):
51        self.name = iface_name
52        self.alias = iface_alias
53        self.vnet_name = ""
54        self.jailed = False
55        self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}}
56        self.prefixes4: List[List[str]] = []
57        self.prefixes6: List[List[str]] = []
58        if iface_name.startswith("lo"):
59            self.iftype = self.IFT_LOOP
60        else:
61            self.iftype = self.IFT_ETHER
62
63    @property
64    def ifindex(self):
65        return socket.if_nametoindex(self.name)
66
67    @property
68    def first_ipv6(self):
69        d = self.addr_map["inet6"]
70        return d[next(iter(d))]
71
72    @property
73    def first_ipv4(self):
74        d = self.addr_map["inet"]
75        return d[next(iter(d))]
76
77    def set_vnet(self, vnet_name: str):
78        self.vnet_name = vnet_name
79
80    def set_jailed(self, jailed: bool):
81        self.jailed = jailed
82
83    def run_cmd(
84        self,
85        cmd,
86        verbose=False,
87    ):
88        if self.vnet_name and not self.jailed:
89            cmd = "jexec {} {}".format(self.vnet_name, cmd)
90        return run_cmd(cmd, verbose)
91
92    @classmethod
93    def setup_loopback(cls, vnet_name: str):
94        lo = VnetInterface("", "lo0")
95        lo.set_vnet(vnet_name)
96        lo.setup_addr("127.0.0.1/8")
97        lo.turn_up()
98
99    @classmethod
100    def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]:
101        name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip()
102        if not name:
103            raise Exception("Unable to create iface {}".format(iface_name))
104        ret = [cls(alias_name, name)]
105        if name.startswith("epair"):
106            ret.append(cls(alias_name, name[:-1] + "b"))
107        return ret
108
109    def setup_addr(self, _addr: str):
110        addr = ipaddress.ip_interface(_addr)
111        if addr.version == 6:
112            family = "inet6"
113            cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
114        else:
115            family = "inet"
116            if self.addr_map[family]:
117                cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr)
118            else:
119                cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
120        self.run_cmd(cmd)
121        self.addr_map[family][str(addr.ip)] = addr
122
123    def delete_addr(self, _addr: str):
124        addr = ipaddress.ip_address(_addr)
125        if addr.version == 6:
126            family = "inet6"
127            cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr)
128        else:
129            family = "inet"
130            cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr)
131        self.run_cmd(cmd)
132        del self.addr_map[family][str(addr)]
133
134    def turn_up(self):
135        cmd = "/sbin/ifconfig {} up".format(self.name)
136        self.run_cmd(cmd)
137
138    def enable_ipv6(self):
139        cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name)
140        self.run_cmd(cmd)
141
142    def has_tentative(self) -> bool:
143        """True if an interface has some addresses in tenative state"""
144        cmd = "/sbin/ifconfig {} inet6".format(self.name)
145        out = self.run_cmd(cmd, verbose=False)
146        for line in out.splitlines():
147            if "tentative" in line:
148                return True
149        return False
150
151
152class IfaceFactory(object):
153    INTERFACES_FNAME = "created_ifaces.lst"
154    AUTODELETE_TYPES = ("epair", "lo", "tap", "tun")
155
156    def __init__(self):
157        self.file_name = self.INTERFACES_FNAME
158
159    def _register_iface(self, iface_name: str):
160        with open(self.file_name, "a") as f:
161            f.write(iface_name + "\n")
162
163    def _list_ifaces(self) -> List[str]:
164        ret: List[str] = []
165        try:
166            with open(self.file_name, "r") as f:
167                for line in f:
168                    ret.append(line.strip())
169        except OSError:
170            pass
171        return ret
172
173    def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]:
174        ifaces = VnetInterface.create_iface(alias_name, iface_name)
175        for iface in ifaces:
176            if not self.is_autodeleted(iface.name):
177                self._register_iface(iface.name)
178        return ifaces
179
180    @staticmethod
181    def is_autodeleted(iface_name: str) -> bool:
182        iface_type = re.split(r"\d+", iface_name)[0]
183        return iface_type in IfaceFactory.AUTODELETE_TYPES
184
185    def cleanup_vnet_interfaces(self, vnet_name: str) -> List[str]:
186        """Destroys"""
187        ifaces_lst = ToolsHelper.get_output(
188            "/usr/sbin/jexec {} ifconfig -l".format(vnet_name)
189        )
190        for iface_name in ifaces_lst.split():
191            if not self.is_autodeleted(iface_name):
192                if iface_name not in self._list_ifaces():
193                    print("Skipping interface {}:{}".format(vnet_name, iface_name))
194                    continue
195            run_cmd(
196                "/usr/sbin/jexec {} ifconfig {} destroy".format(vnet_name, iface_name)
197            )
198
199    def cleanup(self):
200        try:
201            os.unlink(self.INTERFACES_FNAME)
202        except OSError:
203            pass
204
205
206class VnetInstance(object):
207    def __init__(
208        self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface]
209    ):
210        self.name = vnet_name
211        self.alias = vnet_alias  # reference in the test topology
212        self.jid = jid
213        self.ifaces = ifaces
214        self.iface_alias_map = {}  # iface.alias: iface
215        self.iface_map = {}  # iface.name: iface
216        for iface in ifaces:
217            iface.set_vnet(vnet_name)
218            iface.set_jailed(True)
219            self.iface_alias_map[iface.alias] = iface
220            self.iface_map[iface.name] = iface
221        self.need_dad = False  # Disable duplicate address detection by default
222        self.attached = False
223        self.pipe = None
224        self.subprocess = None
225
226    def run_vnet_cmd(self, cmd):
227        if not self.attached:
228            cmd = "jexec {} {}".format(self.name, cmd)
229        return run_cmd(cmd)
230
231    def disable_dad(self):
232        self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0")
233
234    def set_pipe(self, pipe):
235        self.pipe = pipe
236
237    def set_subprocess(self, p):
238        self.subprocess = p
239
240    @staticmethod
241    def attach_jid(jid: int):
242        error_code = libc.jail_attach(jid)
243        if error_code != 0:
244            raise Exception("jail_attach() failed: errno {}".format(error_code))
245
246    def attach(self):
247        self.attach_jid(self.jid)
248        self.attached = True
249
250
251class VnetFactory(object):
252    JAILS_FNAME = "created_jails.lst"
253
254    def __init__(self, topology_id: str):
255        self.topology_id = topology_id
256        self.file_name = self.JAILS_FNAME
257        self._vnets: List[str] = []
258
259    def _register_vnet(self, vnet_name: str):
260        self._vnets.append(vnet_name)
261        with open(self.file_name, "a") as f:
262            f.write(vnet_name + "\n")
263
264    @staticmethod
265    def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]:
266        cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name)
267        not_matched: List[str] = []
268        for i in range(50):
269            vnet_ifaces = run_cmd(cmd).strip().split(" ")
270            not_matched = []
271            for iface_name in ifaces:
272                if iface_name not in vnet_ifaces:
273                    not_matched.append(iface_name)
274            if len(not_matched) == 0:
275                return []
276            time.sleep(0.1)
277        return not_matched
278
279    def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]):
280        vnet_name = "pytest:{}".format(convert_test_name(self.topology_id))
281        if self._vnets:
282            # add number to distinguish jails
283            vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1)
284        iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces])
285        cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format(
286            vnet_name, iface_cmds
287        )
288        jid = 0
289        try:
290            jid_str = run_cmd(cmd)
291            jid = int(jid_str)
292        except ValueError:
293            print("Jail creation failed, output: {}".format(jid_str))
294            raise
295        self._register_vnet(vnet_name)
296
297        # Run expedited version of routing
298        VnetInterface.setup_loopback(vnet_name)
299
300        not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces])
301        if not_found:
302            raise Exception(
303                "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name)
304            )
305        return VnetInstance(vnet_alias, vnet_name, jid, ifaces)
306
307    def cleanup(self):
308        iface_factory = IfaceFactory()
309        try:
310            with open(self.file_name) as f:
311                for line in f:
312                    vnet_name = line.strip()
313                    iface_factory.cleanup_vnet_interfaces(vnet_name)
314                    run_cmd("/usr/sbin/jail -r  {}".format(vnet_name))
315            os.unlink(self.JAILS_FNAME)
316        except OSError:
317            pass
318
319
320class SingleInterfaceMap(NamedTuple):
321    ifaces: List[VnetInterface]
322    vnet_aliases: List[str]
323
324
325class ObjectsMap(NamedTuple):
326    iface_map: Dict[str, SingleInterfaceMap]  # keyed by ifX
327    vnet_map: Dict[str, VnetInstance]  # keyed by vnetX
328    topo_map: Dict  # self.TOPOLOGY
329
330
331class VnetTestTemplate(BaseTest):
332    TOPOLOGY = {}
333
334    def _get_vnet_handler(self, vnet_alias: str):
335        handler_name = "{}_handler".format(vnet_alias)
336        return getattr(self, handler_name, None)
337
338    def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe):
339        """Base Handler to setup given VNET.
340        Can be run in a subprocess. If so, passes control to the special
341        vnetX_handler() after setting up interface addresses
342        """
343        vnet.attach()
344        print("# setup_vnet({})".format(vnet.name))
345        if pipe is not None:
346            vnet.set_pipe(pipe)
347
348        topo = obj_map.topo_map
349        ipv6_ifaces = []
350        # Disable DAD
351        if not vnet.need_dad:
352            vnet.disable_dad()
353        for iface in vnet.ifaces:
354            # check index of vnet within an interface
355            # as we have prefixes for both ends of the interface
356            iface_map = obj_map.iface_map[iface.alias]
357            idx = iface_map.vnet_aliases.index(vnet.alias)
358            prefixes6 = topo[iface.alias].get("prefixes6", [])
359            prefixes4 = topo[iface.alias].get("prefixes4", [])
360            if prefixes6 or prefixes4:
361                ipv6_ifaces.append(iface)
362                iface.turn_up()
363                if prefixes6:
364                    iface.enable_ipv6()
365            for prefix in prefixes6 + prefixes4:
366                iface.setup_addr(prefix[idx])
367        for iface in ipv6_ifaces:
368            while iface.has_tentative():
369                time.sleep(0.1)
370
371        # Run actual handler
372        handler = self._get_vnet_handler(vnet.alias)
373        if handler:
374            # Do unbuffered stdout for children
375            # so the logs are present if the child hangs
376            sys.stdout.reconfigure(line_buffering=True)
377            handler(vnet)
378
379    def setup_topology(self, topo: Dict, topology_id: str):
380        """Creates jails & interfaces for the provided topology"""
381        iface_map: Dict[str, SingleInterfaceMap] = {}
382        vnet_map = {}
383        iface_factory = IfaceFactory()
384        vnet_factory = VnetFactory(topology_id)
385        for obj_name, obj_data in topo.items():
386            if obj_name.startswith("if"):
387                epair_ifaces = iface_factory.create_iface(obj_name, "epair")
388                smap = SingleInterfaceMap(epair_ifaces, [])
389                iface_map[obj_name] = smap
390        for obj_name, obj_data in topo.items():
391            if obj_name.startswith("vnet"):
392                vnet_ifaces = []
393                for iface_alias in obj_data["ifaces"]:
394                    # epair creates 2 interfaces, grab first _available_
395                    # and map it to the VNET being created
396                    idx = len(iface_map[iface_alias].vnet_aliases)
397                    iface_map[iface_alias].vnet_aliases.append(obj_name)
398                    vnet_ifaces.append(iface_map[iface_alias].ifaces[idx])
399                vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces)
400                vnet_map[obj_name] = vnet
401        # Debug output
402        print("============= TEST TOPOLOGY =============")
403        for vnet_alias, vnet in vnet_map.items():
404            print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="")
405            handler = self._get_vnet_handler(vnet.alias)
406            if handler:
407                print(" handler: {}".format(handler.__name__), end="")
408            print()
409        for iface_alias, iface_data in iface_map.items():
410            vnets = iface_data.vnet_aliases
411            ifaces: List[VnetInterface] = iface_data.ifaces
412            if len(vnets) == 1 and len(ifaces) == 2:
413                print(
414                    "# iface {}: {}::{} -> main::{}".format(
415                        iface_alias, vnets[0], ifaces[0].name, ifaces[1].name
416                    )
417                )
418            elif len(vnets) == 2 and len(ifaces) == 2:
419                print(
420                    "# iface {}: {}::{} -> {}::{}".format(
421                        iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name
422                    )
423                )
424            else:
425                print(
426                    "# iface {}: ifaces: {} vnets: {}".format(
427                        iface_alias, vnets, [i.name for i in ifaces]
428                    )
429                )
430        print()
431        return ObjectsMap(iface_map, vnet_map, topo)
432
433    def setup_method(self, _method):
434        """Sets up all the required topology and handlers for the given test"""
435        super().setup_method(_method)
436        # TestIP6Output.test_output6_pktinfo[ipandif]
437        topology_id = get_topology_id(self.test_id)
438        topology = self.TOPOLOGY
439        # First, setup kernel objects - interfaces & vnets
440        obj_map = self.setup_topology(topology, topology_id)
441        main_vnet = None  # one without subprocess handler
442        for vnet_alias, vnet in obj_map.vnet_map.items():
443            if self._get_vnet_handler(vnet_alias):
444                # Need subprocess to run
445                parent_pipe, child_pipe = Pipe()
446                p = Process(
447                    target=self._setup_vnet,
448                    args=(
449                        vnet,
450                        obj_map,
451                        child_pipe,
452                    ),
453                )
454                vnet.set_pipe(parent_pipe)
455                vnet.set_subprocess(p)
456                p.start()
457            else:
458                if main_vnet is not None:
459                    raise Exception("there can be only 1 VNET w/o handler")
460                main_vnet = vnet
461        # Main vnet needs to be the last, so all the other subprocesses
462        # are started & their pipe handles collected
463        self.vnet = main_vnet
464        self._setup_vnet(main_vnet, obj_map, None)
465        # Save state for the main handler
466        self.iface_map = obj_map.iface_map
467        self.vnet_map = obj_map.vnet_map
468
469    def cleanup(self, test_id: str):
470        # pytest test id: file::class::test_name
471        topology_id = get_topology_id(self.test_id)
472
473        print("==== vnet cleanup ===")
474        print("# topology_id: '{}'".format(topology_id))
475        VnetFactory(topology_id).cleanup()
476        IfaceFactory().cleanup()
477
478    def wait_object(self, pipe, timeout=5):
479        if pipe.poll(timeout):
480            return pipe.recv()
481        raise TimeoutError
482
483    def send_object(self, pipe, obj):
484        pipe.send(obj)
485
486    @property
487    def curvnet(self):
488        pass
489
490
491class SingleVnetTestTemplate(VnetTestTemplate):
492    IPV6_PREFIXES: List[str] = []
493    IPV4_PREFIXES: List[str] = []
494
495    def setup_method(self, method):
496        topology = copy.deepcopy(
497            {
498                "vnet1": {"ifaces": ["if1"]},
499                "if1": {"prefixes4": [], "prefixes6": []},
500            }
501        )
502        for prefix in self.IPV6_PREFIXES:
503            topology["if1"]["prefixes6"].append((prefix,))
504        for prefix in self.IPV4_PREFIXES:
505            topology["if1"]["prefixes4"].append((prefix,))
506        self.TOPOLOGY = topology
507        super().setup_method(method)
508