xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision a3a02a52bcfcbcc4a637d4b68bf1bc391c9fad02)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import cmd, ethtool, ip
8from lib.py import NetNS, NetdevSimDev
9from .remote import Remote
10
11
12def _load_env_file(src_path):
13    env = os.environ.copy()
14
15    src_dir = Path(src_path).parent.resolve()
16    if not (src_dir / "net.config").exists():
17        return env
18
19    with open((src_dir / "net.config").as_posix(), 'r') as fp:
20        for line in fp.readlines():
21            full_file = line
22            # Strip comments
23            pos = line.find("#")
24            if pos >= 0:
25                line = line[:pos]
26            line = line.strip()
27            if not line:
28                continue
29            pair = line.split('=', maxsplit=1)
30            if len(pair) != 2:
31                raise Exception("Can't parse configuration line:", full_file)
32            env[pair[0]] = pair[1]
33    return env
34
35
36class NetDrvEnv:
37    """
38    Class for a single NIC / host env, with no remote end
39    """
40    def __init__(self, src_path, **kwargs):
41        self._ns = None
42
43        self.env = _load_env_file(src_path)
44
45        if 'NETIF' in self.env:
46            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
47        else:
48            self._ns = NetdevSimDev(**kwargs)
49            self.dev = self._ns.nsims[0].dev
50        self.ifindex = self.dev['ifindex']
51
52    def __enter__(self):
53        ip(f"link set dev {self.dev['ifname']} up")
54
55        return self
56
57    def __exit__(self, ex_type, ex_value, ex_tb):
58        """
59        __exit__ gets called at the end of a "with" block.
60        """
61        self.__del__()
62
63    def __del__(self):
64        if self._ns:
65            self._ns.remove()
66            self._ns = None
67
68
69class NetDrvEpEnv:
70    """
71    Class for an environment with a local device and "remote endpoint"
72    which can be used to send traffic in.
73
74    For local testing it creates two network namespaces and a pair
75    of netdevsim devices.
76    """
77
78    # Network prefixes used for local tests
79    nsim_v4_pfx = "192.0.2."
80    nsim_v6_pfx = "2001:db8::"
81
82    def __init__(self, src_path, nsim_test=None):
83
84        self.env = _load_env_file(src_path)
85
86        self._stats_settle_time = None
87
88        # Things we try to destroy
89        self.remote = None
90        # These are for local testing state
91        self._netns = None
92        self._ns = None
93        self._ns_peer = None
94
95        if "NETIF" in self.env:
96            if nsim_test is True:
97                raise KsftXfailEx("Test only works on netdevsim")
98            self._check_env()
99
100            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
101
102            self.v4 = self.env.get("LOCAL_V4")
103            self.v6 = self.env.get("LOCAL_V6")
104            self.remote_v4 = self.env.get("REMOTE_V4")
105            self.remote_v6 = self.env.get("REMOTE_V6")
106            kind = self.env["REMOTE_TYPE"]
107            args = self.env["REMOTE_ARGS"]
108        else:
109            if nsim_test is False:
110                raise KsftXfailEx("Test does not work on netdevsim")
111
112            self.create_local()
113
114            self.dev = self._ns.nsims[0].dev
115
116            self.v4 = self.nsim_v4_pfx + "1"
117            self.v6 = self.nsim_v6_pfx + "1"
118            self.remote_v4 = self.nsim_v4_pfx + "2"
119            self.remote_v6 = self.nsim_v6_pfx + "2"
120            kind = "netns"
121            args = self._netns.name
122
123        self.remote = Remote(kind, args, src_path)
124
125        self.addr = self.v6 if self.v6 else self.v4
126        self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
127
128        self.addr_ipver = "6" if self.v6 else "4"
129        # Bracketed addresses, some commands need IPv6 to be inside []
130        self.baddr = f"[{self.v6}]" if self.v6 else self.v4
131        self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
132
133        self.ifname = self.dev['ifname']
134        self.ifindex = self.dev['ifindex']
135
136        self._required_cmd = {}
137
138    def create_local(self):
139        self._netns = NetNS()
140        self._ns = NetdevSimDev()
141        self._ns_peer = NetdevSimDev(ns=self._netns)
142
143        with open("/proc/self/ns/net") as nsfd0, \
144             open("/var/run/netns/" + self._netns.name) as nsfd1:
145            ifi0 = self._ns.nsims[0].ifindex
146            ifi1 = self._ns_peer.nsims[0].ifindex
147            NetdevSimDev.ctrl_write('link_device',
148                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
149
150        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
151        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
152        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
153
154        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
155        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
156        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
157
158    def _check_env(self):
159        vars_needed = [
160            ["LOCAL_V4", "LOCAL_V6"],
161            ["REMOTE_V4", "REMOTE_V6"],
162            ["REMOTE_TYPE"],
163            ["REMOTE_ARGS"]
164        ]
165        missing = []
166
167        for choice in vars_needed:
168            for entry in choice:
169                if entry in self.env:
170                    break
171            else:
172                missing.append(choice)
173        # Make sure v4 / v6 configs are symmetric
174        if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
175            missing.append(["LOCAL_V6", "REMOTE_V6"])
176        if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
177            missing.append(["LOCAL_V4", "REMOTE_V4"])
178        if missing:
179            raise Exception("Invalid environment, missing configuration:", missing,
180                            "Please see tools/testing/selftests/drivers/net/README.rst")
181
182    def __enter__(self):
183        return self
184
185    def __exit__(self, ex_type, ex_value, ex_tb):
186        """
187        __exit__ gets called at the end of a "with" block.
188        """
189        self.__del__()
190
191    def __del__(self):
192        if self._ns:
193            self._ns.remove()
194            self._ns = None
195        if self._ns_peer:
196            self._ns_peer.remove()
197            self._ns_peer = None
198        if self._netns:
199            del self._netns
200            self._netns = None
201        if self.remote:
202            del self.remote
203            self.remote = None
204
205    def require_v4(self):
206        if not self.v4 or not self.remote_v4:
207            raise KsftSkipEx("Test requires IPv4 connectivity")
208
209    def require_v6(self):
210        if not self.v6 or not self.remote_v6:
211            raise KsftSkipEx("Test requires IPv6 connectivity")
212
213    def _require_cmd(self, comm, key, host=None):
214        cached = self._required_cmd.get(comm, {})
215        if cached.get(key) is None:
216            cached[key] = cmd("command -v -- " + comm, fail=False,
217                              shell=True, host=host).ret == 0
218        self._required_cmd[comm] = cached
219        return cached[key]
220
221    def require_cmd(self, comm, local=True, remote=False):
222        if local:
223            if not self._require_cmd(comm, "local"):
224                raise KsftSkipEx("Test requires command: " + comm)
225        if remote:
226            if not self._require_cmd(comm, "remote"):
227                raise KsftSkipEx("Test requires (remote) command: " + comm)
228
229    def wait_hw_stats_settle(self):
230        """
231        Wait for HW stats to become consistent, some devices DMA HW stats
232        periodically so events won't be reflected until next sync.
233        Good drivers will tell us via ethtool what their sync period is.
234        """
235        if self._stats_settle_time is None:
236            data = ethtool("-c " + self.ifname, json=True)[0]
237
238            self._stats_settle_time = 0.025 + \
239                data.get('stats-block-usecs', 0) / 1000 / 1000
240
241        time.sleep(self._stats_settle_time)
242