xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision 2151003e773c7e7dba4d64bed4bfc483681b5f6a)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import ksft_setup
8from lib.py import cmd, ethtool, ip, CmdExitFailure
9from lib.py import NetNS, NetdevSimDev
10from .remote import Remote
11
12
13class NetDrvEnvBase:
14    """
15    Base class for a NIC / host envirnoments
16    """
17    def __init__(self, src_path):
18        self.src_path = src_path
19        self.env = self._load_env_file()
20
21    def rpath(self, path):
22        """
23        Get an absolute path to a file based on a path relative to the directory
24        containing the test which constructed env.
25
26        For example, if the test.py is in the same directory as
27        a binary (built from helper.c), the test can use env.rpath("helper")
28        to get the absolute path to the binary
29        """
30        src_dir = Path(self.src_path).parent.resolve()
31        return (src_dir / path).as_posix()
32
33    def _load_env_file(self):
34        env = os.environ.copy()
35
36        src_dir = Path(self.src_path).parent.resolve()
37        if not (src_dir / "net.config").exists():
38            return ksft_setup(env)
39
40        with open((src_dir / "net.config").as_posix(), 'r') as fp:
41            for line in fp.readlines():
42                full_file = line
43                # Strip comments
44                pos = line.find("#")
45                if pos >= 0:
46                    line = line[:pos]
47                line = line.strip()
48                if not line:
49                    continue
50                pair = line.split('=', maxsplit=1)
51                if len(pair) != 2:
52                    raise Exception("Can't parse configuration line:", full_file)
53                env[pair[0]] = pair[1]
54        return ksft_setup(env)
55
56
57class NetDrvEnv(NetDrvEnvBase):
58    """
59    Class for a single NIC / host env, with no remote end
60    """
61    def __init__(self, src_path, **kwargs):
62        super().__init__(src_path)
63
64        self._ns = None
65
66        if 'NETIF' in self.env:
67            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
68        else:
69            self._ns = NetdevSimDev(**kwargs)
70            self.dev = self._ns.nsims[0].dev
71        self.ifname = self.dev['ifname']
72        self.ifindex = self.dev['ifindex']
73
74    def __enter__(self):
75        ip(f"link set dev {self.dev['ifname']} up")
76
77        return self
78
79    def __exit__(self, ex_type, ex_value, ex_tb):
80        """
81        __exit__ gets called at the end of a "with" block.
82        """
83        self.__del__()
84
85    def __del__(self):
86        if self._ns:
87            self._ns.remove()
88            self._ns = None
89
90
91class NetDrvEpEnv(NetDrvEnvBase):
92    """
93    Class for an environment with a local device and "remote endpoint"
94    which can be used to send traffic in.
95
96    For local testing it creates two network namespaces and a pair
97    of netdevsim devices.
98    """
99
100    # Network prefixes used for local tests
101    nsim_v4_pfx = "192.0.2."
102    nsim_v6_pfx = "2001:db8::"
103
104    def __init__(self, src_path, nsim_test=None):
105        super().__init__(src_path)
106
107        self._stats_settle_time = None
108
109        # Things we try to destroy
110        self.remote = None
111        # These are for local testing state
112        self._netns = None
113        self._ns = None
114        self._ns_peer = None
115
116        if "NETIF" in self.env:
117            if nsim_test is True:
118                raise KsftXfailEx("Test only works on netdevsim")
119            self._check_env()
120
121            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
122
123            self.v4 = self.env.get("LOCAL_V4")
124            self.v6 = self.env.get("LOCAL_V6")
125            self.remote_v4 = self.env.get("REMOTE_V4")
126            self.remote_v6 = self.env.get("REMOTE_V6")
127            kind = self.env["REMOTE_TYPE"]
128            args = self.env["REMOTE_ARGS"]
129        else:
130            if nsim_test is False:
131                raise KsftXfailEx("Test does not work on netdevsim")
132
133            self.create_local()
134
135            self.dev = self._ns.nsims[0].dev
136
137            self.v4 = self.nsim_v4_pfx + "1"
138            self.v6 = self.nsim_v6_pfx + "1"
139            self.remote_v4 = self.nsim_v4_pfx + "2"
140            self.remote_v6 = self.nsim_v6_pfx + "2"
141            kind = "netns"
142            args = self._netns.name
143
144        self.remote = Remote(kind, args, src_path)
145
146        self.addr = self.v6 if self.v6 else self.v4
147        self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
148
149        self.addr_ipver = "6" if self.v6 else "4"
150        # Bracketed addresses, some commands need IPv6 to be inside []
151        self.baddr = f"[{self.v6}]" if self.v6 else self.v4
152        self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
153
154        self.ifname = self.dev['ifname']
155        self.ifindex = self.dev['ifindex']
156
157        self._required_cmd = {}
158
159    def create_local(self):
160        self._netns = NetNS()
161        self._ns = NetdevSimDev()
162        self._ns_peer = NetdevSimDev(ns=self._netns)
163
164        with open("/proc/self/ns/net") as nsfd0, \
165             open("/var/run/netns/" + self._netns.name) as nsfd1:
166            ifi0 = self._ns.nsims[0].ifindex
167            ifi1 = self._ns_peer.nsims[0].ifindex
168            NetdevSimDev.ctrl_write('link_device',
169                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
170
171        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
172        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
173        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
174
175        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
176        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
177        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
178
179    def _check_env(self):
180        vars_needed = [
181            ["LOCAL_V4", "LOCAL_V6"],
182            ["REMOTE_V4", "REMOTE_V6"],
183            ["REMOTE_TYPE"],
184            ["REMOTE_ARGS"]
185        ]
186        missing = []
187
188        for choice in vars_needed:
189            for entry in choice:
190                if entry in self.env:
191                    break
192            else:
193                missing.append(choice)
194        # Make sure v4 / v6 configs are symmetric
195        if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
196            missing.append(["LOCAL_V6", "REMOTE_V6"])
197        if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
198            missing.append(["LOCAL_V4", "REMOTE_V4"])
199        if missing:
200            raise Exception("Invalid environment, missing configuration:", missing,
201                            "Please see tools/testing/selftests/drivers/net/README.rst")
202
203    def __enter__(self):
204        return self
205
206    def __exit__(self, ex_type, ex_value, ex_tb):
207        """
208        __exit__ gets called at the end of a "with" block.
209        """
210        self.__del__()
211
212    def __del__(self):
213        if self._ns:
214            self._ns.remove()
215            self._ns = None
216        if self._ns_peer:
217            self._ns_peer.remove()
218            self._ns_peer = None
219        if self._netns:
220            del self._netns
221            self._netns = None
222        if self.remote:
223            del self.remote
224            self.remote = None
225
226    def require_v4(self):
227        if not self.v4 or not self.remote_v4:
228            raise KsftSkipEx("Test requires IPv4 connectivity")
229
230    def require_v6(self):
231        if not self.v6 or not self.remote_v6:
232            raise KsftSkipEx("Test requires IPv6 connectivity")
233
234    def _require_cmd(self, comm, key, host=None):
235        cached = self._required_cmd.get(comm, {})
236        if cached.get(key) is None:
237            cached[key] = cmd("command -v -- " + comm, fail=False,
238                              shell=True, host=host).ret == 0
239        self._required_cmd[comm] = cached
240        return cached[key]
241
242    def require_cmd(self, comm, local=True, remote=False):
243        if local:
244            if not self._require_cmd(comm, "local"):
245                raise KsftSkipEx("Test requires command: " + comm)
246        if remote:
247            if not self._require_cmd(comm, "remote"):
248                raise KsftSkipEx("Test requires (remote) command: " + comm)
249
250    def wait_hw_stats_settle(self):
251        """
252        Wait for HW stats to become consistent, some devices DMA HW stats
253        periodically so events won't be reflected until next sync.
254        Good drivers will tell us via ethtool what their sync period is.
255        """
256        if self._stats_settle_time is None:
257            data = {}
258            try:
259                data = ethtool("-c " + self.ifname, json=True)[0]
260            except CmdExitFailure as e:
261                if "Operation not supported" not in e.cmd.stderr:
262                    raise
263
264            self._stats_settle_time = 0.025 + \
265                data.get('stats-block-usecs', 0) / 1000 / 1000
266
267        time.sleep(self._stats_settle_time)
268