1# SPDX-License-Identifier: GPL-2.0 2 3import os 4import time 5from pathlib import Path 6from lib.py import KsftSkipEx, KsftXfailEx 7from lib.py import cmd, ethtool, ip 8from lib.py import NetNS, NetdevSimDev 9from .remote import Remote 10 11 12def _load_env_file(src_path): 13 env = os.environ.copy() 14 15 src_dir = Path(src_path).parent.resolve() 16 if not (src_dir / "net.config").exists(): 17 return env 18 19 with open((src_dir / "net.config").as_posix(), 'r') as fp: 20 for line in fp.readlines(): 21 full_file = line 22 # Strip comments 23 pos = line.find("#") 24 if pos >= 0: 25 line = line[:pos] 26 line = line.strip() 27 if not line: 28 continue 29 pair = line.split('=', maxsplit=1) 30 if len(pair) != 2: 31 raise Exception("Can't parse configuration line:", full_file) 32 env[pair[0]] = pair[1] 33 return env 34 35 36class NetDrvEnv: 37 """ 38 Class for a single NIC / host env, with no remote end 39 """ 40 def __init__(self, src_path, **kwargs): 41 self._ns = None 42 43 self.env = _load_env_file(src_path) 44 45 if 'NETIF' in self.env: 46 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 47 else: 48 self._ns = NetdevSimDev(**kwargs) 49 self.dev = self._ns.nsims[0].dev 50 self.ifindex = self.dev['ifindex'] 51 52 def __enter__(self): 53 ip(f"link set dev {self.dev['ifname']} up") 54 55 return self 56 57 def __exit__(self, ex_type, ex_value, ex_tb): 58 """ 59 __exit__ gets called at the end of a "with" block. 60 """ 61 self.__del__() 62 63 def __del__(self): 64 if self._ns: 65 self._ns.remove() 66 self._ns = None 67 68 69class NetDrvEpEnv: 70 """ 71 Class for an environment with a local device and "remote endpoint" 72 which can be used to send traffic in. 73 74 For local testing it creates two network namespaces and a pair 75 of netdevsim devices. 76 """ 77 78 # Network prefixes used for local tests 79 nsim_v4_pfx = "192.0.2." 80 nsim_v6_pfx = "2001:db8::" 81 82 def __init__(self, src_path, nsim_test=None): 83 84 self.env = _load_env_file(src_path) 85 86 self._stats_settle_time = None 87 88 # Things we try to destroy 89 self.remote = None 90 # These are for local testing state 91 self._netns = None 92 self._ns = None 93 self._ns_peer = None 94 95 if "NETIF" in self.env: 96 if nsim_test is True: 97 raise KsftXfailEx("Test only works on netdevsim") 98 self._check_env() 99 100 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 101 102 self.v4 = self.env.get("LOCAL_V4") 103 self.v6 = self.env.get("LOCAL_V6") 104 self.remote_v4 = self.env.get("REMOTE_V4") 105 self.remote_v6 = self.env.get("REMOTE_V6") 106 kind = self.env["REMOTE_TYPE"] 107 args = self.env["REMOTE_ARGS"] 108 else: 109 if nsim_test is False: 110 raise KsftXfailEx("Test does not work on netdevsim") 111 112 self.create_local() 113 114 self.dev = self._ns.nsims[0].dev 115 116 self.v4 = self.nsim_v4_pfx + "1" 117 self.v6 = self.nsim_v6_pfx + "1" 118 self.remote_v4 = self.nsim_v4_pfx + "2" 119 self.remote_v6 = self.nsim_v6_pfx + "2" 120 kind = "netns" 121 args = self._netns.name 122 123 self.remote = Remote(kind, args, src_path) 124 125 self.addr = self.v6 if self.v6 else self.v4 126 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4 127 128 self.addr_ipver = "6" if self.v6 else "4" 129 # Bracketed addresses, some commands need IPv6 to be inside [] 130 self.baddr = f"[{self.v6}]" if self.v6 else self.v4 131 self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4 132 133 self.ifname = self.dev['ifname'] 134 self.ifindex = self.dev['ifindex'] 135 136 self._required_cmd = {} 137 138 def create_local(self): 139 self._netns = NetNS() 140 self._ns = NetdevSimDev() 141 self._ns_peer = NetdevSimDev(ns=self._netns) 142 143 with open("/proc/self/ns/net") as nsfd0, \ 144 open("/var/run/netns/" + self._netns.name) as nsfd1: 145 ifi0 = self._ns.nsims[0].ifindex 146 ifi1 = self._ns_peer.nsims[0].ifindex 147 NetdevSimDev.ctrl_write('link_device', 148 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 149 150 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 151 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 152 ip(f" link set dev {self._ns.nsims[0].ifname} up") 153 154 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 155 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 156 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 157 158 def _check_env(self): 159 vars_needed = [ 160 ["LOCAL_V4", "LOCAL_V6"], 161 ["REMOTE_V4", "REMOTE_V6"], 162 ["REMOTE_TYPE"], 163 ["REMOTE_ARGS"] 164 ] 165 missing = [] 166 167 for choice in vars_needed: 168 for entry in choice: 169 if entry in self.env: 170 break 171 else: 172 missing.append(choice) 173 # Make sure v4 / v6 configs are symmetric 174 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): 175 missing.append(["LOCAL_V6", "REMOTE_V6"]) 176 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): 177 missing.append(["LOCAL_V4", "REMOTE_V4"]) 178 if missing: 179 raise Exception("Invalid environment, missing configuration:", missing, 180 "Please see tools/testing/selftests/drivers/net/README.rst") 181 182 def __enter__(self): 183 return self 184 185 def __exit__(self, ex_type, ex_value, ex_tb): 186 """ 187 __exit__ gets called at the end of a "with" block. 188 """ 189 self.__del__() 190 191 def __del__(self): 192 if self._ns: 193 self._ns.remove() 194 self._ns = None 195 if self._ns_peer: 196 self._ns_peer.remove() 197 self._ns_peer = None 198 if self._netns: 199 del self._netns 200 self._netns = None 201 if self.remote: 202 del self.remote 203 self.remote = None 204 205 def require_v4(self): 206 if not self.v4 or not self.remote_v4: 207 raise KsftSkipEx("Test requires IPv4 connectivity") 208 209 def require_v6(self): 210 if not self.v6 or not self.remote_v6: 211 raise KsftSkipEx("Test requires IPv6 connectivity") 212 213 def _require_cmd(self, comm, key, host=None): 214 cached = self._required_cmd.get(comm, {}) 215 if cached.get(key) is None: 216 cached[key] = cmd("command -v -- " + comm, fail=False, 217 shell=True, host=host).ret == 0 218 self._required_cmd[comm] = cached 219 return cached[key] 220 221 def require_cmd(self, comm, local=True, remote=False): 222 if local: 223 if not self._require_cmd(comm, "local"): 224 raise KsftSkipEx("Test requires command: " + comm) 225 if remote: 226 if not self._require_cmd(comm, "remote"): 227 raise KsftSkipEx("Test requires (remote) command: " + comm) 228 229 def wait_hw_stats_settle(self): 230 """ 231 Wait for HW stats to become consistent, some devices DMA HW stats 232 periodically so events won't be reflected until next sync. 233 Good drivers will tell us via ethtool what their sync period is. 234 """ 235 if self._stats_settle_time is None: 236 data = ethtool("-c " + self.ifname, json=True)[0] 237 238 self._stats_settle_time = 0.025 + \ 239 data.get('stats-block-usecs', 0) / 1000 / 1000 240 241 time.sleep(self._stats_settle_time) 242