1# SPDX-License-Identifier: GPL-2.0 2 3import os 4import time 5from pathlib import Path 6from lib.py import KsftSkipEx, KsftXfailEx 7from lib.py import ksft_setup 8from lib.py import cmd, ethtool, ip, CmdExitFailure 9from lib.py import NetNS, NetdevSimDev 10from .remote import Remote 11 12 13class NetDrvEnvBase: 14 """ 15 Base class for a NIC / host envirnoments 16 """ 17 def __init__(self, src_path): 18 self.src_path = src_path 19 self.env = self._load_env_file() 20 21 def rpath(self, path): 22 """ 23 Get an absolute path to a file based on a path relative to the directory 24 containing the test which constructed env. 25 26 For example, if the test.py is in the same directory as 27 a binary (built from helper.c), the test can use env.rpath("helper") 28 to get the absolute path to the binary 29 """ 30 src_dir = Path(self.src_path).parent.resolve() 31 return (src_dir / path).as_posix() 32 33 def _load_env_file(self): 34 env = os.environ.copy() 35 36 src_dir = Path(self.src_path).parent.resolve() 37 if not (src_dir / "net.config").exists(): 38 return ksft_setup(env) 39 40 with open((src_dir / "net.config").as_posix(), 'r') as fp: 41 for line in fp.readlines(): 42 full_file = line 43 # Strip comments 44 pos = line.find("#") 45 if pos >= 0: 46 line = line[:pos] 47 line = line.strip() 48 if not line: 49 continue 50 pair = line.split('=', maxsplit=1) 51 if len(pair) != 2: 52 raise Exception("Can't parse configuration line:", full_file) 53 env[pair[0]] = pair[1] 54 return ksft_setup(env) 55 56 57class NetDrvEnv(NetDrvEnvBase): 58 """ 59 Class for a single NIC / host env, with no remote end 60 """ 61 def __init__(self, src_path, **kwargs): 62 super().__init__(src_path) 63 64 self._ns = None 65 66 if 'NETIF' in self.env: 67 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 68 else: 69 self._ns = NetdevSimDev(**kwargs) 70 self.dev = self._ns.nsims[0].dev 71 self.ifname = self.dev['ifname'] 72 self.ifindex = self.dev['ifindex'] 73 74 def __enter__(self): 75 ip(f"link set dev {self.dev['ifname']} up") 76 77 return self 78 79 def __exit__(self, ex_type, ex_value, ex_tb): 80 """ 81 __exit__ gets called at the end of a "with" block. 82 """ 83 self.__del__() 84 85 def __del__(self): 86 if self._ns: 87 self._ns.remove() 88 self._ns = None 89 90 91class NetDrvEpEnv(NetDrvEnvBase): 92 """ 93 Class for an environment with a local device and "remote endpoint" 94 which can be used to send traffic in. 95 96 For local testing it creates two network namespaces and a pair 97 of netdevsim devices. 98 """ 99 100 # Network prefixes used for local tests 101 nsim_v4_pfx = "192.0.2." 102 nsim_v6_pfx = "2001:db8::" 103 104 def __init__(self, src_path, nsim_test=None): 105 super().__init__(src_path) 106 107 self._stats_settle_time = None 108 109 # Things we try to destroy 110 self.remote = None 111 # These are for local testing state 112 self._netns = None 113 self._ns = None 114 self._ns_peer = None 115 116 if "NETIF" in self.env: 117 if nsim_test is True: 118 raise KsftXfailEx("Test only works on netdevsim") 119 self._check_env() 120 121 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 122 123 self.v4 = self.env.get("LOCAL_V4") 124 self.v6 = self.env.get("LOCAL_V6") 125 self.remote_v4 = self.env.get("REMOTE_V4") 126 self.remote_v6 = self.env.get("REMOTE_V6") 127 kind = self.env["REMOTE_TYPE"] 128 args = self.env["REMOTE_ARGS"] 129 else: 130 if nsim_test is False: 131 raise KsftXfailEx("Test does not work on netdevsim") 132 133 self.create_local() 134 135 self.dev = self._ns.nsims[0].dev 136 137 self.v4 = self.nsim_v4_pfx + "1" 138 self.v6 = self.nsim_v6_pfx + "1" 139 self.remote_v4 = self.nsim_v4_pfx + "2" 140 self.remote_v6 = self.nsim_v6_pfx + "2" 141 kind = "netns" 142 args = self._netns.name 143 144 self.remote = Remote(kind, args, src_path) 145 146 self.addr = self.v6 if self.v6 else self.v4 147 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4 148 149 self.addr_ipver = "6" if self.v6 else "4" 150 # Bracketed addresses, some commands need IPv6 to be inside [] 151 self.baddr = f"[{self.v6}]" if self.v6 else self.v4 152 self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4 153 154 self.ifname = self.dev['ifname'] 155 self.ifindex = self.dev['ifindex'] 156 157 self._required_cmd = {} 158 159 def create_local(self): 160 self._netns = NetNS() 161 self._ns = NetdevSimDev() 162 self._ns_peer = NetdevSimDev(ns=self._netns) 163 164 with open("/proc/self/ns/net") as nsfd0, \ 165 open("/var/run/netns/" + self._netns.name) as nsfd1: 166 ifi0 = self._ns.nsims[0].ifindex 167 ifi1 = self._ns_peer.nsims[0].ifindex 168 NetdevSimDev.ctrl_write('link_device', 169 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 170 171 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 172 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 173 ip(f" link set dev {self._ns.nsims[0].ifname} up") 174 175 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 176 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 177 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 178 179 def _check_env(self): 180 vars_needed = [ 181 ["LOCAL_V4", "LOCAL_V6"], 182 ["REMOTE_V4", "REMOTE_V6"], 183 ["REMOTE_TYPE"], 184 ["REMOTE_ARGS"] 185 ] 186 missing = [] 187 188 for choice in vars_needed: 189 for entry in choice: 190 if entry in self.env: 191 break 192 else: 193 missing.append(choice) 194 # Make sure v4 / v6 configs are symmetric 195 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): 196 missing.append(["LOCAL_V6", "REMOTE_V6"]) 197 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): 198 missing.append(["LOCAL_V4", "REMOTE_V4"]) 199 if missing: 200 raise Exception("Invalid environment, missing configuration:", missing, 201 "Please see tools/testing/selftests/drivers/net/README.rst") 202 203 def __enter__(self): 204 return self 205 206 def __exit__(self, ex_type, ex_value, ex_tb): 207 """ 208 __exit__ gets called at the end of a "with" block. 209 """ 210 self.__del__() 211 212 def __del__(self): 213 if self._ns: 214 self._ns.remove() 215 self._ns = None 216 if self._ns_peer: 217 self._ns_peer.remove() 218 self._ns_peer = None 219 if self._netns: 220 del self._netns 221 self._netns = None 222 if self.remote: 223 del self.remote 224 self.remote = None 225 226 def require_v4(self): 227 if not self.v4 or not self.remote_v4: 228 raise KsftSkipEx("Test requires IPv4 connectivity") 229 230 def require_v6(self): 231 if not self.v6 or not self.remote_v6: 232 raise KsftSkipEx("Test requires IPv6 connectivity") 233 234 def _require_cmd(self, comm, key, host=None): 235 cached = self._required_cmd.get(comm, {}) 236 if cached.get(key) is None: 237 cached[key] = cmd("command -v -- " + comm, fail=False, 238 shell=True, host=host).ret == 0 239 self._required_cmd[comm] = cached 240 return cached[key] 241 242 def require_cmd(self, comm, local=True, remote=False): 243 if local: 244 if not self._require_cmd(comm, "local"): 245 raise KsftSkipEx("Test requires command: " + comm) 246 if remote: 247 if not self._require_cmd(comm, "remote"): 248 raise KsftSkipEx("Test requires (remote) command: " + comm) 249 250 def wait_hw_stats_settle(self): 251 """ 252 Wait for HW stats to become consistent, some devices DMA HW stats 253 periodically so events won't be reflected until next sync. 254 Good drivers will tell us via ethtool what their sync period is. 255 """ 256 if self._stats_settle_time is None: 257 data = {} 258 try: 259 data = ethtool("-c " + self.ifname, json=True)[0] 260 except CmdExitFailure as e: 261 if "Operation not supported" not in e.cmd.stderr: 262 raise 263 264 self._stats_settle_time = 0.025 + \ 265 data.get('stats-block-usecs', 0) / 1000 / 1000 266 267 time.sleep(self._stats_settle_time) 268