xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision d2b007374551ac09db16badde575cdd698f6fc92)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import ksft_setup, wait_file
8from lib.py import cmd, ethtool, ip, CmdExitFailure
9from lib.py import NetNS, NetdevSimDev
10from .remote import Remote
11
12
13class NetDrvEnvBase:
14    """
15    Base class for a NIC / host environments
16
17    Attributes:
18      test_dir: Path to the source directory of the test
19      net_lib_dir: Path to the net/lib directory
20    """
21    def __init__(self, src_path):
22        self.src_path = Path(src_path)
23        self.test_dir = self.src_path.parent.resolve()
24        self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
25
26        self.env = self._load_env_file()
27
28        # Following attrs must be set be inheriting classes
29        self.dev = None
30
31    def _load_env_file(self):
32        env = os.environ.copy()
33
34        src_dir = Path(self.src_path).parent.resolve()
35        if not (src_dir / "net.config").exists():
36            return ksft_setup(env)
37
38        with open((src_dir / "net.config").as_posix(), 'r') as fp:
39            for line in fp.readlines():
40                full_file = line
41                # Strip comments
42                pos = line.find("#")
43                if pos >= 0:
44                    line = line[:pos]
45                line = line.strip()
46                if not line:
47                    continue
48                pair = line.split('=', maxsplit=1)
49                if len(pair) != 2:
50                    raise Exception("Can't parse configuration line:", full_file)
51                env[pair[0]] = pair[1]
52        return ksft_setup(env)
53
54    def __del__(self):
55        pass
56
57    def __enter__(self):
58        ip(f"link set dev {self.dev['ifname']} up")
59        wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier",
60                  lambda x: x.strip() == "1")
61
62        return self
63
64    def __exit__(self, ex_type, ex_value, ex_tb):
65        """
66        __exit__ gets called at the end of a "with" block.
67        """
68        self.__del__()
69
70
71class NetDrvEnv(NetDrvEnvBase):
72    """
73    Class for a single NIC / host env, with no remote end
74    """
75    def __init__(self, src_path, nsim_test=None, **kwargs):
76        super().__init__(src_path)
77
78        self._ns = None
79
80        if 'NETIF' in self.env:
81            if nsim_test is True:
82                raise KsftXfailEx("Test only works on netdevsim")
83
84            self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
85        else:
86            if nsim_test is False:
87                raise KsftXfailEx("Test does not work on netdevsim")
88
89            self._ns = NetdevSimDev(**kwargs)
90            self.dev = self._ns.nsims[0].dev
91        self.ifname = self.dev['ifname']
92        self.ifindex = self.dev['ifindex']
93
94    def __del__(self):
95        if self._ns:
96            self._ns.remove()
97            self._ns = None
98
99
100class NetDrvEpEnv(NetDrvEnvBase):
101    """
102    Class for an environment with a local device and "remote endpoint"
103    which can be used to send traffic in.
104
105    For local testing it creates two network namespaces and a pair
106    of netdevsim devices.
107    """
108
109    # Network prefixes used for local tests
110    nsim_v4_pfx = "192.0.2."
111    nsim_v6_pfx = "2001:db8::"
112
113    def __init__(self, src_path, nsim_test=None):
114        super().__init__(src_path)
115
116        self._stats_settle_time = None
117
118        # Things we try to destroy
119        self.remote = None
120        # These are for local testing state
121        self._netns = None
122        self._ns = None
123        self._ns_peer = None
124
125        self.addr_v        = { "4": None, "6": None }
126        self.remote_addr_v = { "4": None, "6": None }
127
128        if "NETIF" in self.env:
129            if nsim_test is True:
130                raise KsftXfailEx("Test only works on netdevsim")
131            self._check_env()
132
133            self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
134
135            self.addr_v["4"] = self.env.get("LOCAL_V4")
136            self.addr_v["6"] = self.env.get("LOCAL_V6")
137            self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
138            self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
139            kind = self.env["REMOTE_TYPE"]
140            args = self.env["REMOTE_ARGS"]
141        else:
142            if nsim_test is False:
143                raise KsftXfailEx("Test does not work on netdevsim")
144
145            self.create_local()
146
147            self.dev = self._ns.nsims[0].dev
148
149            self.addr_v["4"] = self.nsim_v4_pfx + "1"
150            self.addr_v["6"] = self.nsim_v6_pfx + "1"
151            self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
152            self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
153            kind = "netns"
154            args = self._netns.name
155
156        self.remote = Remote(kind, args, src_path)
157
158        self.addr_ipver = "6" if self.addr_v["6"] else "4"
159        self.addr = self.addr_v[self.addr_ipver]
160        self.remote_addr = self.remote_addr_v[self.addr_ipver]
161
162        # Bracketed addresses, some commands need IPv6 to be inside []
163        self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
164        self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
165
166        self.ifname = self.dev['ifname']
167        self.ifindex = self.dev['ifindex']
168
169        # resolve remote interface name
170        self.remote_ifname = self.resolve_remote_ifc()
171
172        self._required_cmd = {}
173
174    def create_local(self):
175        self._netns = NetNS()
176        self._ns = NetdevSimDev()
177        self._ns_peer = NetdevSimDev(ns=self._netns)
178
179        with open("/proc/self/ns/net") as nsfd0, \
180             open("/var/run/netns/" + self._netns.name) as nsfd1:
181            ifi0 = self._ns.nsims[0].ifindex
182            ifi1 = self._ns_peer.nsims[0].ifindex
183            NetdevSimDev.ctrl_write('link_device',
184                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
185
186        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
187        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
188        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
189
190        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
191        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
192        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
193
194    def _check_env(self):
195        vars_needed = [
196            ["LOCAL_V4", "LOCAL_V6"],
197            ["REMOTE_V4", "REMOTE_V6"],
198            ["REMOTE_TYPE"],
199            ["REMOTE_ARGS"]
200        ]
201        missing = []
202
203        for choice in vars_needed:
204            for entry in choice:
205                if entry in self.env:
206                    break
207            else:
208                missing.append(choice)
209        # Make sure v4 / v6 configs are symmetric
210        if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
211            missing.append(["LOCAL_V6", "REMOTE_V6"])
212        if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
213            missing.append(["LOCAL_V4", "REMOTE_V4"])
214        if missing:
215            raise Exception("Invalid environment, missing configuration:", missing,
216                            "Please see tools/testing/selftests/drivers/net/README.rst")
217
218    def resolve_remote_ifc(self):
219        v4 = v6 = None
220        if self.remote_addr_v["4"]:
221            v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
222        if self.remote_addr_v["6"]:
223            v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
224        if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
225            raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
226        if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
227            raise Exception("Can't resolve remote interface name, multiple interfaces match")
228        return v6[0]["ifname"] if v6 else v4[0]["ifname"]
229
230    def __del__(self):
231        if self._ns:
232            self._ns.remove()
233            self._ns = None
234        if self._ns_peer:
235            self._ns_peer.remove()
236            self._ns_peer = None
237        if self._netns:
238            del self._netns
239            self._netns = None
240        if self.remote:
241            del self.remote
242            self.remote = None
243
244    def require_ipver(self, ipver):
245        if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
246            raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
247
248    def _require_cmd(self, comm, key, host=None):
249        cached = self._required_cmd.get(comm, {})
250        if cached.get(key) is None:
251            cached[key] = cmd("command -v -- " + comm, fail=False,
252                              shell=True, host=host).ret == 0
253        self._required_cmd[comm] = cached
254        return cached[key]
255
256    def require_cmd(self, comm, local=True, remote=False):
257        if local:
258            if not self._require_cmd(comm, "local"):
259                raise KsftSkipEx("Test requires command: " + comm)
260        if remote:
261            if not self._require_cmd(comm, "remote", host=self.remote):
262                raise KsftSkipEx("Test requires (remote) command: " + comm)
263
264    def wait_hw_stats_settle(self):
265        """
266        Wait for HW stats to become consistent, some devices DMA HW stats
267        periodically so events won't be reflected until next sync.
268        Good drivers will tell us via ethtool what their sync period is.
269        """
270        if self._stats_settle_time is None:
271            data = {}
272            try:
273                data = ethtool("-c " + self.ifname, json=True)[0]
274            except CmdExitFailure as e:
275                if "Operation not supported" not in e.cmd.stderr:
276                    raise
277
278            self._stats_settle_time = 0.025 + \
279                data.get('stats-block-usecs', 0) / 1000 / 1000
280
281        time.sleep(self._stats_settle_time)
282