1# SPDX-License-Identifier: GPL-2.0 2 3import os 4import time 5from pathlib import Path 6from lib.py import KsftSkipEx, KsftXfailEx 7from lib.py import ksft_setup 8from lib.py import cmd, ethtool, ip, CmdExitFailure 9from lib.py import NetNS, NetdevSimDev 10from .remote import Remote 11 12 13def _load_env_file(src_path): 14 env = os.environ.copy() 15 16 src_dir = Path(src_path).parent.resolve() 17 if not (src_dir / "net.config").exists(): 18 return ksft_setup(env) 19 20 with open((src_dir / "net.config").as_posix(), 'r') as fp: 21 for line in fp.readlines(): 22 full_file = line 23 # Strip comments 24 pos = line.find("#") 25 if pos >= 0: 26 line = line[:pos] 27 line = line.strip() 28 if not line: 29 continue 30 pair = line.split('=', maxsplit=1) 31 if len(pair) != 2: 32 raise Exception("Can't parse configuration line:", full_file) 33 env[pair[0]] = pair[1] 34 return ksft_setup(env) 35 36 37class NetDrvEnv: 38 """ 39 Class for a single NIC / host env, with no remote end 40 """ 41 def __init__(self, src_path, **kwargs): 42 self._ns = None 43 44 self.env = _load_env_file(src_path) 45 46 if 'NETIF' in self.env: 47 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 48 else: 49 self._ns = NetdevSimDev(**kwargs) 50 self.dev = self._ns.nsims[0].dev 51 self.ifname = self.dev['ifname'] 52 self.ifindex = self.dev['ifindex'] 53 54 def __enter__(self): 55 ip(f"link set dev {self.dev['ifname']} up") 56 57 return self 58 59 def __exit__(self, ex_type, ex_value, ex_tb): 60 """ 61 __exit__ gets called at the end of a "with" block. 62 """ 63 self.__del__() 64 65 def __del__(self): 66 if self._ns: 67 self._ns.remove() 68 self._ns = None 69 70 71class NetDrvEpEnv: 72 """ 73 Class for an environment with a local device and "remote endpoint" 74 which can be used to send traffic in. 75 76 For local testing it creates two network namespaces and a pair 77 of netdevsim devices. 78 """ 79 80 # Network prefixes used for local tests 81 nsim_v4_pfx = "192.0.2." 82 nsim_v6_pfx = "2001:db8::" 83 84 def __init__(self, src_path, nsim_test=None): 85 86 self.env = _load_env_file(src_path) 87 88 self._stats_settle_time = None 89 90 # Things we try to destroy 91 self.remote = None 92 # These are for local testing state 93 self._netns = None 94 self._ns = None 95 self._ns_peer = None 96 97 if "NETIF" in self.env: 98 if nsim_test is True: 99 raise KsftXfailEx("Test only works on netdevsim") 100 self._check_env() 101 102 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 103 104 self.v4 = self.env.get("LOCAL_V4") 105 self.v6 = self.env.get("LOCAL_V6") 106 self.remote_v4 = self.env.get("REMOTE_V4") 107 self.remote_v6 = self.env.get("REMOTE_V6") 108 kind = self.env["REMOTE_TYPE"] 109 args = self.env["REMOTE_ARGS"] 110 else: 111 if nsim_test is False: 112 raise KsftXfailEx("Test does not work on netdevsim") 113 114 self.create_local() 115 116 self.dev = self._ns.nsims[0].dev 117 118 self.v4 = self.nsim_v4_pfx + "1" 119 self.v6 = self.nsim_v6_pfx + "1" 120 self.remote_v4 = self.nsim_v4_pfx + "2" 121 self.remote_v6 = self.nsim_v6_pfx + "2" 122 kind = "netns" 123 args = self._netns.name 124 125 self.remote = Remote(kind, args, src_path) 126 127 self.addr = self.v6 if self.v6 else self.v4 128 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4 129 130 self.addr_ipver = "6" if self.v6 else "4" 131 # Bracketed addresses, some commands need IPv6 to be inside [] 132 self.baddr = f"[{self.v6}]" if self.v6 else self.v4 133 self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4 134 135 self.ifname = self.dev['ifname'] 136 self.ifindex = self.dev['ifindex'] 137 138 self._required_cmd = {} 139 140 def create_local(self): 141 self._netns = NetNS() 142 self._ns = NetdevSimDev() 143 self._ns_peer = NetdevSimDev(ns=self._netns) 144 145 with open("/proc/self/ns/net") as nsfd0, \ 146 open("/var/run/netns/" + self._netns.name) as nsfd1: 147 ifi0 = self._ns.nsims[0].ifindex 148 ifi1 = self._ns_peer.nsims[0].ifindex 149 NetdevSimDev.ctrl_write('link_device', 150 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 151 152 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 153 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 154 ip(f" link set dev {self._ns.nsims[0].ifname} up") 155 156 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 157 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 158 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 159 160 def _check_env(self): 161 vars_needed = [ 162 ["LOCAL_V4", "LOCAL_V6"], 163 ["REMOTE_V4", "REMOTE_V6"], 164 ["REMOTE_TYPE"], 165 ["REMOTE_ARGS"] 166 ] 167 missing = [] 168 169 for choice in vars_needed: 170 for entry in choice: 171 if entry in self.env: 172 break 173 else: 174 missing.append(choice) 175 # Make sure v4 / v6 configs are symmetric 176 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): 177 missing.append(["LOCAL_V6", "REMOTE_V6"]) 178 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): 179 missing.append(["LOCAL_V4", "REMOTE_V4"]) 180 if missing: 181 raise Exception("Invalid environment, missing configuration:", missing, 182 "Please see tools/testing/selftests/drivers/net/README.rst") 183 184 def __enter__(self): 185 return self 186 187 def __exit__(self, ex_type, ex_value, ex_tb): 188 """ 189 __exit__ gets called at the end of a "with" block. 190 """ 191 self.__del__() 192 193 def __del__(self): 194 if self._ns: 195 self._ns.remove() 196 self._ns = None 197 if self._ns_peer: 198 self._ns_peer.remove() 199 self._ns_peer = None 200 if self._netns: 201 del self._netns 202 self._netns = None 203 if self.remote: 204 del self.remote 205 self.remote = None 206 207 def require_v4(self): 208 if not self.v4 or not self.remote_v4: 209 raise KsftSkipEx("Test requires IPv4 connectivity") 210 211 def require_v6(self): 212 if not self.v6 or not self.remote_v6: 213 raise KsftSkipEx("Test requires IPv6 connectivity") 214 215 def _require_cmd(self, comm, key, host=None): 216 cached = self._required_cmd.get(comm, {}) 217 if cached.get(key) is None: 218 cached[key] = cmd("command -v -- " + comm, fail=False, 219 shell=True, host=host).ret == 0 220 self._required_cmd[comm] = cached 221 return cached[key] 222 223 def require_cmd(self, comm, local=True, remote=False): 224 if local: 225 if not self._require_cmd(comm, "local"): 226 raise KsftSkipEx("Test requires command: " + comm) 227 if remote: 228 if not self._require_cmd(comm, "remote"): 229 raise KsftSkipEx("Test requires (remote) command: " + comm) 230 231 def wait_hw_stats_settle(self): 232 """ 233 Wait for HW stats to become consistent, some devices DMA HW stats 234 periodically so events won't be reflected until next sync. 235 Good drivers will tell us via ethtool what their sync period is. 236 """ 237 if self._stats_settle_time is None: 238 data = {} 239 try: 240 data = ethtool("-c " + self.ifname, json=True)[0] 241 except CmdExitFailure as e: 242 if "Operation not supported" not in e.cmd.stderr: 243 raise 244 245 self._stats_settle_time = 0.025 + \ 246 data.get('stats-block-usecs', 0) / 1000 / 1000 247 248 time.sleep(self._stats_settle_time) 249