1# SPDX-License-Identifier: GPL-2.0 2 3import os 4from pathlib import Path 5from lib.py import KsftSkipEx, KsftXfailEx 6from lib.py import cmd, ip 7from lib.py import NetNS, NetdevSimDev 8from .remote import Remote 9 10 11def _load_env_file(src_path): 12 env = os.environ.copy() 13 14 src_dir = Path(src_path).parent.resolve() 15 if not (src_dir / "net.config").exists(): 16 return env 17 18 with open((src_dir / "net.config").as_posix(), 'r') as fp: 19 for line in fp.readlines(): 20 full_file = line 21 # Strip comments 22 pos = line.find("#") 23 if pos >= 0: 24 line = line[:pos] 25 line = line.strip() 26 if not line: 27 continue 28 pair = line.split('=', maxsplit=1) 29 if len(pair) != 2: 30 raise Exception("Can't parse configuration line:", full_file) 31 env[pair[0]] = pair[1] 32 return env 33 34 35class NetDrvEnv: 36 """ 37 Class for a single NIC / host env, with no remote end 38 """ 39 def __init__(self, src_path): 40 self._ns = None 41 42 self.env = _load_env_file(src_path) 43 44 if 'NETIF' in self.env: 45 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 46 else: 47 self._ns = NetdevSimDev() 48 self.dev = self._ns.nsims[0].dev 49 self.ifindex = self.dev['ifindex'] 50 51 def __enter__(self): 52 return self 53 54 def __exit__(self, ex_type, ex_value, ex_tb): 55 """ 56 __exit__ gets called at the end of a "with" block. 57 """ 58 self.__del__() 59 60 def __del__(self): 61 if self._ns: 62 self._ns.remove() 63 self._ns = None 64 65 66class NetDrvEpEnv: 67 """ 68 Class for an environment with a local device and "remote endpoint" 69 which can be used to send traffic in. 70 71 For local testing it creates two network namespaces and a pair 72 of netdevsim devices. 73 """ 74 75 # Network prefixes used for local tests 76 nsim_v4_pfx = "192.0.2." 77 nsim_v6_pfx = "2001:db8::" 78 79 def __init__(self, src_path, nsim_test=None): 80 81 self.env = _load_env_file(src_path) 82 83 # Things we try to destroy 84 self.remote = None 85 # These are for local testing state 86 self._netns = None 87 self._ns = None 88 self._ns_peer = None 89 90 if "NETIF" in self.env: 91 if nsim_test is True: 92 raise KsftXfailEx("Test only works on netdevsim") 93 self._check_env() 94 95 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 96 97 self.v4 = self.env.get("LOCAL_V4") 98 self.v6 = self.env.get("LOCAL_V6") 99 self.remote_v4 = self.env.get("REMOTE_V4") 100 self.remote_v6 = self.env.get("REMOTE_V6") 101 kind = self.env["REMOTE_TYPE"] 102 args = self.env["REMOTE_ARGS"] 103 else: 104 if nsim_test is False: 105 raise KsftXfailEx("Test does not work on netdevsim") 106 107 self.create_local() 108 109 self.dev = self._ns.nsims[0].dev 110 111 self.v4 = self.nsim_v4_pfx + "1" 112 self.v6 = self.nsim_v6_pfx + "1" 113 self.remote_v4 = self.nsim_v4_pfx + "2" 114 self.remote_v6 = self.nsim_v6_pfx + "2" 115 kind = "netns" 116 args = self._netns.name 117 118 self.remote = Remote(kind, args, src_path) 119 120 self.addr = self.v6 if self.v6 else self.v4 121 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4 122 123 self.addr_ipver = "6" if self.v6 else "4" 124 # Bracketed addresses, some commands need IPv6 to be inside [] 125 self.baddr = f"[{self.v6}]" if self.v6 else self.v4 126 self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4 127 128 self.ifname = self.dev['ifname'] 129 self.ifindex = self.dev['ifindex'] 130 131 self._required_cmd = {} 132 133 def create_local(self): 134 self._netns = NetNS() 135 self._ns = NetdevSimDev() 136 self._ns_peer = NetdevSimDev(ns=self._netns) 137 138 with open("/proc/self/ns/net") as nsfd0, \ 139 open("/var/run/netns/" + self._netns.name) as nsfd1: 140 ifi0 = self._ns.nsims[0].ifindex 141 ifi1 = self._ns_peer.nsims[0].ifindex 142 NetdevSimDev.ctrl_write('link_device', 143 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 144 145 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 146 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 147 ip(f" link set dev {self._ns.nsims[0].ifname} up") 148 149 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 150 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 151 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 152 153 def _check_env(self): 154 vars_needed = [ 155 ["LOCAL_V4", "LOCAL_V6"], 156 ["REMOTE_V4", "REMOTE_V6"], 157 ["REMOTE_TYPE"], 158 ["REMOTE_ARGS"] 159 ] 160 missing = [] 161 162 for choice in vars_needed: 163 for entry in choice: 164 if entry in self.env: 165 break 166 else: 167 missing.append(choice) 168 # Make sure v4 / v6 configs are symmetric 169 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): 170 missing.append(["LOCAL_V6", "REMOTE_V6"]) 171 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): 172 missing.append(["LOCAL_V4", "REMOTE_V4"]) 173 if missing: 174 raise Exception("Invalid environment, missing configuration:", missing, 175 "Please see tools/testing/selftests/drivers/net/README.rst") 176 177 def __enter__(self): 178 return self 179 180 def __exit__(self, ex_type, ex_value, ex_tb): 181 """ 182 __exit__ gets called at the end of a "with" block. 183 """ 184 self.__del__() 185 186 def __del__(self): 187 if self._ns: 188 self._ns.remove() 189 self._ns = None 190 if self._ns_peer: 191 self._ns_peer.remove() 192 self._ns_peer = None 193 if self._netns: 194 del self._netns 195 self._netns = None 196 if self.remote: 197 del self.remote 198 self.remote = None 199 200 def require_v4(self): 201 if not self.v4 or not self.remote_v4: 202 raise KsftSkipEx("Test requires IPv4 connectivity") 203 204 def require_v6(self): 205 if not self.v6 or not self.remote_v6: 206 raise KsftSkipEx("Test requires IPv6 connectivity") 207 208 def _require_cmd(self, comm, key, host=None): 209 cached = self._required_cmd.get(comm, {}) 210 if cached.get(key) is None: 211 cached[key] = cmd("command -v -- " + comm, fail=False, 212 shell=True, host=host).ret == 0 213 self._required_cmd[comm] = cached 214 return cached[key] 215 216 def require_cmd(self, comm, local=True, remote=False): 217 if local: 218 if not self._require_cmd(comm, "local"): 219 raise KsftSkipEx("Test requires command: " + comm) 220 if remote: 221 if not self._require_cmd(comm, "remote"): 222 raise KsftSkipEx("Test requires (remote) command: " + comm) 223