xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision 1a9239bb4253f9076b5b4b2a1a4e8d7defd77a95)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import ksft_setup
8from lib.py import cmd, ethtool, ip, CmdExitFailure
9from lib.py import NetNS, NetdevSimDev
10from .remote import Remote
11
12
13class NetDrvEnvBase:
14    """
15    Base class for a NIC / host envirnoments
16    """
17    def __init__(self, src_path):
18        self.src_path = src_path
19        self.env = self._load_env_file()
20
21    def rpath(self, path):
22        """
23        Get an absolute path to a file based on a path relative to the directory
24        containing the test which constructed env.
25
26        For example, if the test.py is in the same directory as
27        a binary (built from helper.c), the test can use env.rpath("helper")
28        to get the absolute path to the binary
29        """
30        src_dir = Path(self.src_path).parent.resolve()
31        return (src_dir / path).as_posix()
32
33    def _load_env_file(self):
34        env = os.environ.copy()
35
36        src_dir = Path(self.src_path).parent.resolve()
37        if not (src_dir / "net.config").exists():
38            return ksft_setup(env)
39
40        with open((src_dir / "net.config").as_posix(), 'r') as fp:
41            for line in fp.readlines():
42                full_file = line
43                # Strip comments
44                pos = line.find("#")
45                if pos >= 0:
46                    line = line[:pos]
47                line = line.strip()
48                if not line:
49                    continue
50                pair = line.split('=', maxsplit=1)
51                if len(pair) != 2:
52                    raise Exception("Can't parse configuration line:", full_file)
53                env[pair[0]] = pair[1]
54        return ksft_setup(env)
55
56
57class NetDrvEnv(NetDrvEnvBase):
58    """
59    Class for a single NIC / host env, with no remote end
60    """
61    def __init__(self, src_path, nsim_test=None, **kwargs):
62        super().__init__(src_path)
63
64        self._ns = None
65
66        if 'NETIF' in self.env:
67            if nsim_test is True:
68                raise KsftXfailEx("Test only works on netdevsim")
69
70            self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
71        else:
72            if nsim_test is False:
73                raise KsftXfailEx("Test does not work on netdevsim")
74
75            self._ns = NetdevSimDev(**kwargs)
76            self.dev = self._ns.nsims[0].dev
77        self.ifname = self.dev['ifname']
78        self.ifindex = self.dev['ifindex']
79
80    def __enter__(self):
81        ip(f"link set dev {self.dev['ifname']} up")
82
83        return self
84
85    def __exit__(self, ex_type, ex_value, ex_tb):
86        """
87        __exit__ gets called at the end of a "with" block.
88        """
89        self.__del__()
90
91    def __del__(self):
92        if self._ns:
93            self._ns.remove()
94            self._ns = None
95
96
97class NetDrvEpEnv(NetDrvEnvBase):
98    """
99    Class for an environment with a local device and "remote endpoint"
100    which can be used to send traffic in.
101
102    For local testing it creates two network namespaces and a pair
103    of netdevsim devices.
104    """
105
106    # Network prefixes used for local tests
107    nsim_v4_pfx = "192.0.2."
108    nsim_v6_pfx = "2001:db8::"
109
110    def __init__(self, src_path, nsim_test=None):
111        super().__init__(src_path)
112
113        self._stats_settle_time = None
114
115        # Things we try to destroy
116        self.remote = None
117        # These are for local testing state
118        self._netns = None
119        self._ns = None
120        self._ns_peer = None
121
122        self.addr_v        = { "4": None, "6": None }
123        self.remote_addr_v = { "4": None, "6": None }
124
125        if "NETIF" in self.env:
126            if nsim_test is True:
127                raise KsftXfailEx("Test only works on netdevsim")
128            self._check_env()
129
130            self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
131
132            self.addr_v["4"] = self.env.get("LOCAL_V4")
133            self.addr_v["6"] = self.env.get("LOCAL_V6")
134            self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
135            self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
136            kind = self.env["REMOTE_TYPE"]
137            args = self.env["REMOTE_ARGS"]
138        else:
139            if nsim_test is False:
140                raise KsftXfailEx("Test does not work on netdevsim")
141
142            self.create_local()
143
144            self.dev = self._ns.nsims[0].dev
145
146            self.addr_v["4"] = self.nsim_v4_pfx + "1"
147            self.addr_v["6"] = self.nsim_v6_pfx + "1"
148            self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
149            self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
150            kind = "netns"
151            args = self._netns.name
152
153        self.remote = Remote(kind, args, src_path)
154
155        self.addr_ipver = "6" if self.addr_v["6"] else "4"
156        self.addr = self.addr_v[self.addr_ipver]
157        self.remote_addr = self.remote_addr_v[self.addr_ipver]
158
159        # Bracketed addresses, some commands need IPv6 to be inside []
160        self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
161        self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
162
163        self.ifname = self.dev['ifname']
164        self.ifindex = self.dev['ifindex']
165
166        # resolve remote interface name
167        self.remote_ifname = self.resolve_remote_ifc()
168
169        self._required_cmd = {}
170
171    def create_local(self):
172        self._netns = NetNS()
173        self._ns = NetdevSimDev()
174        self._ns_peer = NetdevSimDev(ns=self._netns)
175
176        with open("/proc/self/ns/net") as nsfd0, \
177             open("/var/run/netns/" + self._netns.name) as nsfd1:
178            ifi0 = self._ns.nsims[0].ifindex
179            ifi1 = self._ns_peer.nsims[0].ifindex
180            NetdevSimDev.ctrl_write('link_device',
181                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
182
183        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
184        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
185        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
186
187        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
188        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
189        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
190
191    def _check_env(self):
192        vars_needed = [
193            ["LOCAL_V4", "LOCAL_V6"],
194            ["REMOTE_V4", "REMOTE_V6"],
195            ["REMOTE_TYPE"],
196            ["REMOTE_ARGS"]
197        ]
198        missing = []
199
200        for choice in vars_needed:
201            for entry in choice:
202                if entry in self.env:
203                    break
204            else:
205                missing.append(choice)
206        # Make sure v4 / v6 configs are symmetric
207        if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
208            missing.append(["LOCAL_V6", "REMOTE_V6"])
209        if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
210            missing.append(["LOCAL_V4", "REMOTE_V4"])
211        if missing:
212            raise Exception("Invalid environment, missing configuration:", missing,
213                            "Please see tools/testing/selftests/drivers/net/README.rst")
214
215    def resolve_remote_ifc(self):
216        v4 = v6 = None
217        if self.remote_addr_v["4"]:
218            v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
219        if self.remote_addr_v["6"]:
220            v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
221        if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
222            raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
223        if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
224            raise Exception("Can't resolve remote interface name, multiple interfaces match")
225        return v6[0]["ifname"] if v6 else v4[0]["ifname"]
226
227    def __enter__(self):
228        return self
229
230    def __exit__(self, ex_type, ex_value, ex_tb):
231        """
232        __exit__ gets called at the end of a "with" block.
233        """
234        self.__del__()
235
236    def __del__(self):
237        if self._ns:
238            self._ns.remove()
239            self._ns = None
240        if self._ns_peer:
241            self._ns_peer.remove()
242            self._ns_peer = None
243        if self._netns:
244            del self._netns
245            self._netns = None
246        if self.remote:
247            del self.remote
248            self.remote = None
249
250    def require_ipver(self, ipver):
251        if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
252            raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
253
254    def _require_cmd(self, comm, key, host=None):
255        cached = self._required_cmd.get(comm, {})
256        if cached.get(key) is None:
257            cached[key] = cmd("command -v -- " + comm, fail=False,
258                              shell=True, host=host).ret == 0
259        self._required_cmd[comm] = cached
260        return cached[key]
261
262    def require_cmd(self, comm, local=True, remote=False):
263        if local:
264            if not self._require_cmd(comm, "local"):
265                raise KsftSkipEx("Test requires command: " + comm)
266        if remote:
267            if not self._require_cmd(comm, "remote"):
268                raise KsftSkipEx("Test requires (remote) command: " + comm)
269
270    def wait_hw_stats_settle(self):
271        """
272        Wait for HW stats to become consistent, some devices DMA HW stats
273        periodically so events won't be reflected until next sync.
274        Good drivers will tell us via ethtool what their sync period is.
275        """
276        if self._stats_settle_time is None:
277            data = {}
278            try:
279                data = ethtool("-c " + self.ifname, json=True)[0]
280            except CmdExitFailure as e:
281                if "Operation not supported" not in e.cmd.stderr:
282                    raise
283
284            self._stats_settle_time = 0.025 + \
285                data.get('stats-block-usecs', 0) / 1000 / 1000
286
287        time.sleep(self._stats_settle_time)
288