xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision b6499840cafca25175f43ebd601913bf31d06f16)
1 # SPDX-License-Identifier: GPL-2.0
2 
3 import os
4 import time
5 from pathlib import Path
6 from lib.py import KsftSkipEx, KsftXfailEx
7 from lib.py import cmd, ethtool, ip
8 from lib.py import NetNS, NetdevSimDev
9 from .remote import Remote
10 
11 
12 def _load_env_file(src_path):
13     env = os.environ.copy()
14 
15     src_dir = Path(src_path).parent.resolve()
16     if not (src_dir / "net.config").exists():
17         return env
18 
19     with open((src_dir / "net.config").as_posix(), 'r') as fp:
20         for line in fp.readlines():
21             full_file = line
22             # Strip comments
23             pos = line.find("#")
24             if pos >= 0:
25                 line = line[:pos]
26             line = line.strip()
27             if not line:
28                 continue
29             pair = line.split('=', maxsplit=1)
30             if len(pair) != 2:
31                 raise Exception("Can't parse configuration line:", full_file)
32             env[pair[0]] = pair[1]
33     return env
34 
35 
36 class NetDrvEnv:
37     """
38     Class for a single NIC / host env, with no remote end
39     """
40     def __init__(self, src_path, **kwargs):
41         self._ns = None
42 
43         self.env = _load_env_file(src_path)
44 
45         if 'NETIF' in self.env:
46             self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
47         else:
48             self._ns = NetdevSimDev(**kwargs)
49             self.dev = self._ns.nsims[0].dev
50         self.ifindex = self.dev['ifindex']
51 
52     def __enter__(self):
53         ip(f"link set dev {self.dev['ifname']} up")
54 
55         return self
56 
57     def __exit__(self, ex_type, ex_value, ex_tb):
58         """
59         __exit__ gets called at the end of a "with" block.
60         """
61         self.__del__()
62 
63     def __del__(self):
64         if self._ns:
65             self._ns.remove()
66             self._ns = None
67 
68 
69 class NetDrvEpEnv:
70     """
71     Class for an environment with a local device and "remote endpoint"
72     which can be used to send traffic in.
73 
74     For local testing it creates two network namespaces and a pair
75     of netdevsim devices.
76     """
77 
78     # Network prefixes used for local tests
79     nsim_v4_pfx = "192.0.2."
80     nsim_v6_pfx = "2001:db8::"
81 
82     def __init__(self, src_path, nsim_test=None):
83 
84         self.env = _load_env_file(src_path)
85 
86         self._stats_settle_time = None
87 
88         # Things we try to destroy
89         self.remote = None
90         # These are for local testing state
91         self._netns = None
92         self._ns = None
93         self._ns_peer = None
94 
95         if "NETIF" in self.env:
96             if nsim_test is True:
97                 raise KsftXfailEx("Test only works on netdevsim")
98             self._check_env()
99 
100             self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
101 
102             self.v4 = self.env.get("LOCAL_V4")
103             self.v6 = self.env.get("LOCAL_V6")
104             self.remote_v4 = self.env.get("REMOTE_V4")
105             self.remote_v6 = self.env.get("REMOTE_V6")
106             kind = self.env["REMOTE_TYPE"]
107             args = self.env["REMOTE_ARGS"]
108         else:
109             if nsim_test is False:
110                 raise KsftXfailEx("Test does not work on netdevsim")
111 
112             self.create_local()
113 
114             self.dev = self._ns.nsims[0].dev
115 
116             self.v4 = self.nsim_v4_pfx + "1"
117             self.v6 = self.nsim_v6_pfx + "1"
118             self.remote_v4 = self.nsim_v4_pfx + "2"
119             self.remote_v6 = self.nsim_v6_pfx + "2"
120             kind = "netns"
121             args = self._netns.name
122 
123         self.remote = Remote(kind, args, src_path)
124 
125         self.addr = self.v6 if self.v6 else self.v4
126         self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
127 
128         self.addr_ipver = "6" if self.v6 else "4"
129         # Bracketed addresses, some commands need IPv6 to be inside []
130         self.baddr = f"[{self.v6}]" if self.v6 else self.v4
131         self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
132 
133         self.ifname = self.dev['ifname']
134         self.ifindex = self.dev['ifindex']
135 
136         self._required_cmd = {}
137 
138     def create_local(self):
139         self._netns = NetNS()
140         self._ns = NetdevSimDev()
141         self._ns_peer = NetdevSimDev(ns=self._netns)
142 
143         with open("/proc/self/ns/net") as nsfd0, \
144              open("/var/run/netns/" + self._netns.name) as nsfd1:
145             ifi0 = self._ns.nsims[0].ifindex
146             ifi1 = self._ns_peer.nsims[0].ifindex
147             NetdevSimDev.ctrl_write('link_device',
148                                     f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
149 
150         ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
151         ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
152         ip(f"   link set dev {self._ns.nsims[0].ifname} up")
153 
154         ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
155         ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
156         ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
157 
158     def _check_env(self):
159         vars_needed = [
160             ["LOCAL_V4", "LOCAL_V6"],
161             ["REMOTE_V4", "REMOTE_V6"],
162             ["REMOTE_TYPE"],
163             ["REMOTE_ARGS"]
164         ]
165         missing = []
166 
167         for choice in vars_needed:
168             for entry in choice:
169                 if entry in self.env:
170                     break
171             else:
172                 missing.append(choice)
173         # Make sure v4 / v6 configs are symmetric
174         if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
175             missing.append(["LOCAL_V6", "REMOTE_V6"])
176         if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
177             missing.append(["LOCAL_V4", "REMOTE_V4"])
178         if missing:
179             raise Exception("Invalid environment, missing configuration:", missing,
180                             "Please see tools/testing/selftests/drivers/net/README.rst")
181 
182     def __enter__(self):
183         return self
184 
185     def __exit__(self, ex_type, ex_value, ex_tb):
186         """
187         __exit__ gets called at the end of a "with" block.
188         """
189         self.__del__()
190 
191     def __del__(self):
192         if self._ns:
193             self._ns.remove()
194             self._ns = None
195         if self._ns_peer:
196             self._ns_peer.remove()
197             self._ns_peer = None
198         if self._netns:
199             del self._netns
200             self._netns = None
201         if self.remote:
202             del self.remote
203             self.remote = None
204 
205     def require_v4(self):
206         if not self.v4 or not self.remote_v4:
207             raise KsftSkipEx("Test requires IPv4 connectivity")
208 
209     def require_v6(self):
210         if not self.v6 or not self.remote_v6:
211             raise KsftSkipEx("Test requires IPv6 connectivity")
212 
213     def _require_cmd(self, comm, key, host=None):
214         cached = self._required_cmd.get(comm, {})
215         if cached.get(key) is None:
216             cached[key] = cmd("command -v -- " + comm, fail=False,
217                               shell=True, host=host).ret == 0
218         self._required_cmd[comm] = cached
219         return cached[key]
220 
221     def require_cmd(self, comm, local=True, remote=False):
222         if local:
223             if not self._require_cmd(comm, "local"):
224                 raise KsftSkipEx("Test requires command: " + comm)
225         if remote:
226             if not self._require_cmd(comm, "remote"):
227                 raise KsftSkipEx("Test requires (remote) command: " + comm)
228 
229     def wait_hw_stats_settle(self):
230         """
231         Wait for HW stats to become consistent, some devices DMA HW stats
232         periodically so events won't be reflected until next sync.
233         Good drivers will tell us via ethtool what their sync period is.
234         """
235         if self._stats_settle_time is None:
236             data = ethtool("-c " + self.ifname, json=True)[0]
237 
238             self._stats_settle_time = 0.025 + \
239                 data.get('stats-block-usecs', 0) / 1000 / 1000
240 
241         time.sleep(self._stats_settle_time)
242