xref: /freebsd/tests/atf_python/sys/net/tools.py (revision 4fbb9c43aa44d9145151bb5f77d302ba01fb7551)
1#!/usr/local/bin/python3
2import json
3import os
4import subprocess
5
6
7class ToolsHelper(object):
8    NETSTAT_PATH = "/usr/bin/netstat"
9    IFCONFIG_PATH = "/sbin/ifconfig"
10
11    @classmethod
12    def get_output(cls, cmd: str, verbose=False) -> str:
13        if verbose:
14            print("run: '{}'".format(cmd))
15        return os.popen(cmd).read()
16
17    @classmethod
18    def pf_rules(cls, rules, verbose=True):
19        pf_conf = ""
20        for r in rules:
21            pf_conf = pf_conf + r + "\n"
22
23        if verbose:
24            print("Set rules:")
25            print(pf_conf)
26
27        ps = subprocess.Popen("/sbin/pfctl -g -f -", shell=True,
28            stdin=subprocess.PIPE)
29        ps.communicate(bytes(pf_conf, 'utf-8'))
30        ret = ps.wait()
31        if ret != 0:
32            raise Exception("Failed to set pf rules %d" % ret)
33
34        if verbose:
35            cls.print_output("/sbin/pfctl -sr")
36
37    @classmethod
38    def print_output(cls, cmd: str, verbose=True):
39        if verbose:
40            print("======= {} =====".format(cmd))
41        print(cls.get_output(cmd))
42        if verbose:
43            print()
44
45    @classmethod
46    def print_net_debug(cls):
47        cls.print_output("ifconfig")
48        cls.print_output("netstat -rnW")
49
50    @classmethod
51    def set_sysctl(cls, oid, val):
52        cls.get_output("sysctl {}={}".format(oid, val))
53
54    @classmethod
55    def get_routes(cls, family: str, fibnum: int = 0):
56        family_key = {"inet": "-4", "inet6": "-6"}.get(family)
57        out = cls.get_output(
58            "{} {} -rnW -F {} --libxo json".format(cls.NETSTAT_PATH, family_key, fibnum)
59        )
60        js = json.loads(out)
61        js = js["statistics"]["route-information"]["route-table"]["rt-family"]
62        if js:
63            return js[0]["rt-entry"]
64        else:
65            return []
66
67    @classmethod
68    def get_nhops(cls, family: str, fibnum: int = 0):
69        family_key = {"inet": "-4", "inet6": "-6"}.get(family)
70        out = cls.get_output(
71            "{} {} -onW -F {} --libxo json".format(cls.NETSTAT_PATH, family_key, fibnum)
72        )
73        js = json.loads(out)
74        js = js["statistics"]["route-nhop-information"]["nhop-table"]["rt-family"]
75        if js:
76            return js[0]["nh-entry"]
77        else:
78            return []
79
80    @classmethod
81    def get_linklocals(cls):
82        ret = {}
83        ifname = None
84        ips = []
85        for line in cls.get_output(cls.IFCONFIG_PATH).splitlines():
86            if line[0].isalnum():
87                if ifname:
88                    ret[ifname] = ips
89                    ips = []
90                ifname = line.split(":")[0]
91            else:
92                words = line.split()
93                if words[0] == "inet6" and words[1].startswith("fe80"):
94                    # inet6 fe80::1%lo0 prefixlen 64 scopeid 0x2
95                    ip = words[1].split("%")[0]
96                    scopeid = int(words[words.index("scopeid") + 1], 16)
97                    ips.append((ip, scopeid))
98        if ifname:
99            ret[ifname] = ips
100        return ret
101