xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision d8f87aa5fa0a4276491fa8ef436cd22605a3f9ba)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import ksft_setup, wait_file
8from lib.py import cmd, ethtool, ip, CmdExitFailure
9from lib.py import NetNS, NetdevSimDev
10from .remote import Remote
11
12
13class NetDrvEnvBase:
14    """
15    Base class for a NIC / host environments
16
17    Attributes:
18      test_dir: Path to the source directory of the test
19      net_lib_dir: Path to the net/lib directory
20    """
21    def __init__(self, src_path):
22        self.src_path = Path(src_path)
23        self.test_dir = self.src_path.parent.resolve()
24        self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
25
26        self.env = self._load_env_file()
27
28        # Following attrs must be set be inheriting classes
29        self.dev = None
30
31    def _load_env_file(self):
32        env = os.environ.copy()
33
34        src_dir = Path(self.src_path).parent.resolve()
35        if not (src_dir / "net.config").exists():
36            return ksft_setup(env)
37
38        with open((src_dir / "net.config").as_posix(), 'r') as fp:
39            for line in fp.readlines():
40                full_file = line
41                # Strip comments
42                pos = line.find("#")
43                if pos >= 0:
44                    line = line[:pos]
45                line = line.strip()
46                if not line:
47                    continue
48                pair = line.split('=', maxsplit=1)
49                if len(pair) != 2:
50                    raise Exception("Can't parse configuration line:", full_file)
51                env[pair[0]] = pair[1]
52        return ksft_setup(env)
53
54    def __del__(self):
55        pass
56
57    def __enter__(self):
58        ip(f"link set dev {self.dev['ifname']} up")
59        wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier",
60                  lambda x: x.strip() == "1")
61
62        return self
63
64    def __exit__(self, ex_type, ex_value, ex_tb):
65        """
66        __exit__ gets called at the end of a "with" block.
67        """
68        self.__del__()
69
70
71class NetDrvEnv(NetDrvEnvBase):
72    """
73    Class for a single NIC / host env, with no remote end
74    """
75    def __init__(self, src_path, nsim_test=None, **kwargs):
76        super().__init__(src_path)
77
78        self._ns = None
79
80        if 'NETIF' in self.env:
81            if nsim_test is True:
82                raise KsftXfailEx("Test only works on netdevsim")
83
84            self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
85        else:
86            if nsim_test is False:
87                raise KsftXfailEx("Test does not work on netdevsim")
88
89            self._ns = NetdevSimDev(**kwargs)
90            self.dev = self._ns.nsims[0].dev
91        self.ifname = self.dev['ifname']
92        self.ifindex = self.dev['ifindex']
93
94    def __del__(self):
95        if self._ns:
96            self._ns.remove()
97            self._ns = None
98
99
100class NetDrvEpEnv(NetDrvEnvBase):
101    """
102    Class for an environment with a local device and "remote endpoint"
103    which can be used to send traffic in.
104
105    For local testing it creates two network namespaces and a pair
106    of netdevsim devices.
107    """
108
109    # Network prefixes used for local tests
110    nsim_v4_pfx = "192.0.2."
111    nsim_v6_pfx = "2001:db8::"
112
113    def __init__(self, src_path, nsim_test=None):
114        super().__init__(src_path)
115
116        self._stats_settle_time = None
117
118        # Things we try to destroy
119        self.remote = None
120        # These are for local testing state
121        self._netns = None
122        self._ns = None
123        self._ns_peer = None
124
125        self.addr_v        = { "4": None, "6": None }
126        self.remote_addr_v = { "4": None, "6": None }
127
128        if "NETIF" in self.env:
129            if nsim_test is True:
130                raise KsftXfailEx("Test only works on netdevsim")
131            self._check_env()
132
133            self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
134
135            self.addr_v["4"] = self.env.get("LOCAL_V4")
136            self.addr_v["6"] = self.env.get("LOCAL_V6")
137            self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
138            self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
139            kind = self.env["REMOTE_TYPE"]
140            args = self.env["REMOTE_ARGS"]
141        else:
142            if nsim_test is False:
143                raise KsftXfailEx("Test does not work on netdevsim")
144
145            self.create_local()
146
147            self.dev = self._ns.nsims[0].dev
148
149            self.addr_v["4"] = self.nsim_v4_pfx + "1"
150            self.addr_v["6"] = self.nsim_v6_pfx + "1"
151            self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
152            self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
153            kind = "netns"
154            args = self._netns.name
155
156        self.remote = Remote(kind, args, src_path)
157
158        self.addr_ipver = "6" if self.addr_v["6"] else "4"
159        self.addr = self.addr_v[self.addr_ipver]
160        self.remote_addr = self.remote_addr_v[self.addr_ipver]
161
162        # Bracketed addresses, some commands need IPv6 to be inside []
163        self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
164        self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
165
166        self.ifname = self.dev['ifname']
167        self.ifindex = self.dev['ifindex']
168
169        # resolve remote interface name
170        self.remote_ifname = self.resolve_remote_ifc()
171        self.remote_dev = ip("-d link show dev " + self.remote_ifname,
172                             host=self.remote, json=True)[0]
173        self.remote_ifindex = self.remote_dev['ifindex']
174
175        self._required_cmd = {}
176
177    def create_local(self):
178        self._netns = NetNS()
179        self._ns = NetdevSimDev()
180        self._ns_peer = NetdevSimDev(ns=self._netns)
181
182        with open("/proc/self/ns/net") as nsfd0, \
183             open("/var/run/netns/" + self._netns.name) as nsfd1:
184            ifi0 = self._ns.nsims[0].ifindex
185            ifi1 = self._ns_peer.nsims[0].ifindex
186            NetdevSimDev.ctrl_write('link_device',
187                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
188
189        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
190        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
191        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
192
193        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
194        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
195        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
196
197    def _check_env(self):
198        vars_needed = [
199            ["LOCAL_V4", "LOCAL_V6"],
200            ["REMOTE_V4", "REMOTE_V6"],
201            ["REMOTE_TYPE"],
202            ["REMOTE_ARGS"]
203        ]
204        missing = []
205
206        for choice in vars_needed:
207            for entry in choice:
208                if entry in self.env:
209                    break
210            else:
211                missing.append(choice)
212        # Make sure v4 / v6 configs are symmetric
213        if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
214            missing.append(["LOCAL_V6", "REMOTE_V6"])
215        if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
216            missing.append(["LOCAL_V4", "REMOTE_V4"])
217        if missing:
218            raise Exception("Invalid environment, missing configuration:", missing,
219                            "Please see tools/testing/selftests/drivers/net/README.rst")
220
221    def resolve_remote_ifc(self):
222        v4 = v6 = None
223        if self.remote_addr_v["4"]:
224            v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
225        if self.remote_addr_v["6"]:
226            v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
227        if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
228            raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
229        if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
230            raise Exception("Can't resolve remote interface name, multiple interfaces match")
231        return v6[0]["ifname"] if v6 else v4[0]["ifname"]
232
233    def __del__(self):
234        if self._ns:
235            self._ns.remove()
236            self._ns = None
237        if self._ns_peer:
238            self._ns_peer.remove()
239            self._ns_peer = None
240        if self._netns:
241            del self._netns
242            self._netns = None
243        if self.remote:
244            del self.remote
245            self.remote = None
246
247    def require_ipver(self, ipver):
248        if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
249            raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
250
251    def require_nsim(self, nsim_test=True):
252        """Require or exclude netdevsim for this test"""
253        if nsim_test and self._ns is None:
254            raise KsftXfailEx("Test only works on netdevsim")
255        if nsim_test is False and self._ns is not None:
256            raise KsftXfailEx("Test does not work on netdevsim")
257
258    def _require_cmd(self, comm, key, host=None):
259        cached = self._required_cmd.get(comm, {})
260        if cached.get(key) is None:
261            cached[key] = cmd("command -v -- " + comm, fail=False,
262                              shell=True, host=host).ret == 0
263        self._required_cmd[comm] = cached
264        return cached[key]
265
266    def require_cmd(self, comm, local=True, remote=False):
267        if local:
268            if not self._require_cmd(comm, "local"):
269                raise KsftSkipEx("Test requires command: " + comm)
270        if remote:
271            if not self._require_cmd(comm, "remote", host=self.remote):
272                raise KsftSkipEx("Test requires (remote) command: " + comm)
273
274    def wait_hw_stats_settle(self):
275        """
276        Wait for HW stats to become consistent, some devices DMA HW stats
277        periodically so events won't be reflected until next sync.
278        Good drivers will tell us via ethtool what their sync period is.
279        """
280        if self._stats_settle_time is None:
281            data = {}
282            try:
283                data = ethtool("-c " + self.ifname, json=True)[0]
284            except CmdExitFailure as e:
285                if "Operation not supported" not in e.cmd.stderr:
286                    raise
287
288            self._stats_settle_time = 0.025 + \
289                data.get('stats-block-usecs', 0) / 1000 / 1000
290
291        time.sleep(self._stats_settle_time)
292