xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision 8f7aa3d3c7323f4ca2768a9e74ebbe359c4f8f88)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import ksft_setup, wait_file
8from lib.py import cmd, ethtool, ip, CmdExitFailure
9from lib.py import NetNS, NetdevSimDev
10from .remote import Remote
11
12
13class NetDrvEnvBase:
14    """
15    Base class for a NIC / host environments
16
17    Attributes:
18      test_dir: Path to the source directory of the test
19      net_lib_dir: Path to the net/lib directory
20    """
21    def __init__(self, src_path):
22        self.src_path = Path(src_path)
23        self.test_dir = self.src_path.parent.resolve()
24        self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
25
26        self.env = self._load_env_file()
27
28        # Following attrs must be set be inheriting classes
29        self.dev = None
30
31    def _load_env_file(self):
32        env = os.environ.copy()
33
34        src_dir = Path(self.src_path).parent.resolve()
35        if not (src_dir / "net.config").exists():
36            return ksft_setup(env)
37
38        with open((src_dir / "net.config").as_posix(), 'r') as fp:
39            for line in fp.readlines():
40                full_file = line
41                # Strip comments
42                pos = line.find("#")
43                if pos >= 0:
44                    line = line[:pos]
45                line = line.strip()
46                if not line:
47                    continue
48                pair = line.split('=', maxsplit=1)
49                if len(pair) != 2:
50                    raise Exception("Can't parse configuration line:", full_file)
51                env[pair[0]] = pair[1]
52        return ksft_setup(env)
53
54    def __del__(self):
55        pass
56
57    def __enter__(self):
58        ip(f"link set dev {self.dev['ifname']} up")
59        wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier",
60                  lambda x: x.strip() == "1")
61
62        return self
63
64    def __exit__(self, ex_type, ex_value, ex_tb):
65        """
66        __exit__ gets called at the end of a "with" block.
67        """
68        self.__del__()
69
70
71class NetDrvEnv(NetDrvEnvBase):
72    """
73    Class for a single NIC / host env, with no remote end
74    """
75    def __init__(self, src_path, nsim_test=None, **kwargs):
76        super().__init__(src_path)
77
78        self._ns = None
79
80        if 'NETIF' in self.env:
81            if nsim_test is True:
82                raise KsftXfailEx("Test only works on netdevsim")
83
84            self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
85        else:
86            if nsim_test is False:
87                raise KsftXfailEx("Test does not work on netdevsim")
88
89            self._ns = NetdevSimDev(**kwargs)
90            self.dev = self._ns.nsims[0].dev
91        self.ifname = self.dev['ifname']
92        self.ifindex = self.dev['ifindex']
93
94    def __del__(self):
95        if self._ns:
96            self._ns.remove()
97            self._ns = None
98
99
100class NetDrvEpEnv(NetDrvEnvBase):
101    """
102    Class for an environment with a local device and "remote endpoint"
103    which can be used to send traffic in.
104
105    For local testing it creates two network namespaces and a pair
106    of netdevsim devices.
107    """
108
109    # Network prefixes used for local tests
110    nsim_v4_pfx = "192.0.2."
111    nsim_v6_pfx = "2001:db8::"
112
113    def __init__(self, src_path, nsim_test=None):
114        super().__init__(src_path)
115
116        self._stats_settle_time = None
117
118        # Things we try to destroy
119        self.remote = None
120        # These are for local testing state
121        self._netns = None
122        self._ns = None
123        self._ns_peer = None
124
125        self.addr_v        = { "4": None, "6": None }
126        self.remote_addr_v = { "4": None, "6": None }
127
128        if "NETIF" in self.env:
129            if nsim_test is True:
130                raise KsftXfailEx("Test only works on netdevsim")
131            self._check_env()
132
133            self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
134
135            self.addr_v["4"] = self.env.get("LOCAL_V4")
136            self.addr_v["6"] = self.env.get("LOCAL_V6")
137            self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
138            self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
139            kind = self.env["REMOTE_TYPE"]
140            args = self.env["REMOTE_ARGS"]
141        else:
142            if nsim_test is False:
143                raise KsftXfailEx("Test does not work on netdevsim")
144
145            self.create_local()
146
147            self.dev = self._ns.nsims[0].dev
148
149            self.addr_v["4"] = self.nsim_v4_pfx + "1"
150            self.addr_v["6"] = self.nsim_v6_pfx + "1"
151            self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
152            self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
153            kind = "netns"
154            args = self._netns.name
155
156        self.remote = Remote(kind, args, src_path)
157
158        self.addr_ipver = "6" if self.addr_v["6"] else "4"
159        self.addr = self.addr_v[self.addr_ipver]
160        self.remote_addr = self.remote_addr_v[self.addr_ipver]
161
162        # Bracketed addresses, some commands need IPv6 to be inside []
163        self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
164        self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
165
166        self.ifname = self.dev['ifname']
167        self.ifindex = self.dev['ifindex']
168
169        # resolve remote interface name
170        self.remote_ifname = self.resolve_remote_ifc()
171        self.remote_dev = ip("-d link show dev " + self.remote_ifname,
172                             host=self.remote, json=True)[0]
173
174        self._required_cmd = {}
175
176    def create_local(self):
177        self._netns = NetNS()
178        self._ns = NetdevSimDev()
179        self._ns_peer = NetdevSimDev(ns=self._netns)
180
181        with open("/proc/self/ns/net") as nsfd0, \
182             open("/var/run/netns/" + self._netns.name) as nsfd1:
183            ifi0 = self._ns.nsims[0].ifindex
184            ifi1 = self._ns_peer.nsims[0].ifindex
185            NetdevSimDev.ctrl_write('link_device',
186                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
187
188        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
189        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
190        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
191
192        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
193        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
194        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
195
196    def _check_env(self):
197        vars_needed = [
198            ["LOCAL_V4", "LOCAL_V6"],
199            ["REMOTE_V4", "REMOTE_V6"],
200            ["REMOTE_TYPE"],
201            ["REMOTE_ARGS"]
202        ]
203        missing = []
204
205        for choice in vars_needed:
206            for entry in choice:
207                if entry in self.env:
208                    break
209            else:
210                missing.append(choice)
211        # Make sure v4 / v6 configs are symmetric
212        if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
213            missing.append(["LOCAL_V6", "REMOTE_V6"])
214        if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
215            missing.append(["LOCAL_V4", "REMOTE_V4"])
216        if missing:
217            raise Exception("Invalid environment, missing configuration:", missing,
218                            "Please see tools/testing/selftests/drivers/net/README.rst")
219
220    def resolve_remote_ifc(self):
221        v4 = v6 = None
222        if self.remote_addr_v["4"]:
223            v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
224        if self.remote_addr_v["6"]:
225            v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
226        if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
227            raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
228        if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
229            raise Exception("Can't resolve remote interface name, multiple interfaces match")
230        return v6[0]["ifname"] if v6 else v4[0]["ifname"]
231
232    def __del__(self):
233        if self._ns:
234            self._ns.remove()
235            self._ns = None
236        if self._ns_peer:
237            self._ns_peer.remove()
238            self._ns_peer = None
239        if self._netns:
240            del self._netns
241            self._netns = None
242        if self.remote:
243            del self.remote
244            self.remote = None
245
246    def require_ipver(self, ipver):
247        if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
248            raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
249
250    def require_nsim(self):
251        if self._ns is None:
252            raise KsftXfailEx("Test only works on netdevsim")
253
254    def _require_cmd(self, comm, key, host=None):
255        cached = self._required_cmd.get(comm, {})
256        if cached.get(key) is None:
257            cached[key] = cmd("command -v -- " + comm, fail=False,
258                              shell=True, host=host).ret == 0
259        self._required_cmd[comm] = cached
260        return cached[key]
261
262    def require_cmd(self, comm, local=True, remote=False):
263        if local:
264            if not self._require_cmd(comm, "local"):
265                raise KsftSkipEx("Test requires command: " + comm)
266        if remote:
267            if not self._require_cmd(comm, "remote", host=self.remote):
268                raise KsftSkipEx("Test requires (remote) command: " + comm)
269
270    def wait_hw_stats_settle(self):
271        """
272        Wait for HW stats to become consistent, some devices DMA HW stats
273        periodically so events won't be reflected until next sync.
274        Good drivers will tell us via ethtool what their sync period is.
275        """
276        if self._stats_settle_time is None:
277            data = {}
278            try:
279                data = ethtool("-c " + self.ifname, json=True)[0]
280            except CmdExitFailure as e:
281                if "Operation not supported" not in e.cmd.stderr:
282                    raise
283
284            self._stats_settle_time = 0.025 + \
285                data.get('stats-block-usecs', 0) / 1000 / 1000
286
287        time.sleep(self._stats_settle_time)
288