1# SPDX-License-Identifier: GPL-2.0 2 3import os 4from pathlib import Path 5from lib.py import KsftSkipEx 6from lib.py import cmd, ip 7from lib.py import NetNS, NetdevSimDev 8from .remote import Remote 9 10 11def _load_env_file(src_path): 12 env = os.environ.copy() 13 14 src_dir = Path(src_path).parent.resolve() 15 if not (src_dir / "net.config").exists(): 16 return env 17 18 with open((src_dir / "net.config").as_posix(), 'r') as fp: 19 for line in fp.readlines(): 20 full_file = line 21 # Strip comments 22 pos = line.find("#") 23 if pos >= 0: 24 line = line[:pos] 25 line = line.strip() 26 if not line: 27 continue 28 pair = line.split('=', maxsplit=1) 29 if len(pair) != 2: 30 raise Exception("Can't parse configuration line:", full_file) 31 env[pair[0]] = pair[1] 32 return env 33 34 35class NetDrvEnv: 36 """ 37 Class for a single NIC / host env, with no remote end 38 """ 39 def __init__(self, src_path): 40 self._ns = None 41 42 self.env = _load_env_file(src_path) 43 44 if 'NETIF' in self.env: 45 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 46 else: 47 self._ns = NetdevSimDev() 48 self.dev = self._ns.nsims[0].dev 49 self.ifindex = self.dev['ifindex'] 50 51 def __enter__(self): 52 return self 53 54 def __exit__(self, ex_type, ex_value, ex_tb): 55 """ 56 __exit__ gets called at the end of a "with" block. 57 """ 58 self.__del__() 59 60 def __del__(self): 61 if self._ns: 62 self._ns.remove() 63 self._ns = None 64 65 66class NetDrvEpEnv: 67 """ 68 Class for an environment with a local device and "remote endpoint" 69 which can be used to send traffic in. 70 71 For local testing it creates two network namespaces and a pair 72 of netdevsim devices. 73 """ 74 75 # Network prefixes used for local tests 76 nsim_v4_pfx = "192.0.2." 77 nsim_v6_pfx = "2001:db8::" 78 79 def __init__(self, src_path): 80 81 self.env = _load_env_file(src_path) 82 83 # Things we try to destroy 84 self.remote = None 85 # These are for local testing state 86 self._netns = None 87 self._ns = None 88 self._ns_peer = None 89 90 if "NETIF" in self.env: 91 self._check_env() 92 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 93 94 self.v4 = self.env.get("LOCAL_V4") 95 self.v6 = self.env.get("LOCAL_V6") 96 self.remote_v4 = self.env.get("REMOTE_V4") 97 self.remote_v6 = self.env.get("REMOTE_V6") 98 kind = self.env["REMOTE_TYPE"] 99 args = self.env["REMOTE_ARGS"] 100 else: 101 self.create_local() 102 103 self.dev = self._ns.nsims[0].dev 104 105 self.v4 = self.nsim_v4_pfx + "1" 106 self.v6 = self.nsim_v6_pfx + "1" 107 self.remote_v4 = self.nsim_v4_pfx + "2" 108 self.remote_v6 = self.nsim_v6_pfx + "2" 109 kind = "netns" 110 args = self._netns.name 111 112 self.remote = Remote(kind, args, src_path) 113 114 self.addr = self.v6 if self.v6 else self.v4 115 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4 116 117 self.addr_ipver = "6" if self.v6 else "4" 118 # Bracketed addresses, some commands need IPv6 to be inside [] 119 self.baddr = f"[{self.v6}]" if self.v6 else self.v4 120 self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4 121 122 self.ifname = self.dev['ifname'] 123 self.ifindex = self.dev['ifindex'] 124 125 self._required_cmd = {} 126 127 def create_local(self): 128 self._netns = NetNS() 129 self._ns = NetdevSimDev() 130 self._ns_peer = NetdevSimDev(ns=self._netns) 131 132 with open("/proc/self/ns/net") as nsfd0, \ 133 open("/var/run/netns/" + self._netns.name) as nsfd1: 134 ifi0 = self._ns.nsims[0].ifindex 135 ifi1 = self._ns_peer.nsims[0].ifindex 136 NetdevSimDev.ctrl_write('link_device', 137 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 138 139 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 140 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 141 ip(f" link set dev {self._ns.nsims[0].ifname} up") 142 143 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 144 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 145 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 146 147 def _check_env(self): 148 vars_needed = [ 149 ["LOCAL_V4", "LOCAL_V6"], 150 ["REMOTE_V4", "REMOTE_V6"], 151 ["REMOTE_TYPE"], 152 ["REMOTE_ARGS"] 153 ] 154 missing = [] 155 156 for choice in vars_needed: 157 for entry in choice: 158 if entry in self.env: 159 break 160 else: 161 missing.append(choice) 162 # Make sure v4 / v6 configs are symmetric 163 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): 164 missing.append(["LOCAL_V6", "REMOTE_V6"]) 165 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): 166 missing.append(["LOCAL_V4", "REMOTE_V4"]) 167 if missing: 168 raise Exception("Invalid environment, missing configuration:", missing, 169 "Please see tools/testing/selftests/drivers/net/README.rst") 170 171 def __enter__(self): 172 return self 173 174 def __exit__(self, ex_type, ex_value, ex_tb): 175 """ 176 __exit__ gets called at the end of a "with" block. 177 """ 178 self.__del__() 179 180 def __del__(self): 181 if self._ns: 182 self._ns.remove() 183 self._ns = None 184 if self._ns_peer: 185 self._ns_peer.remove() 186 self._ns_peer = None 187 if self._netns: 188 del self._netns 189 self._netns = None 190 if self.remote: 191 del self.remote 192 self.remote = None 193 194 def require_v4(self): 195 if not self.v4 or not self.remote_v4: 196 raise KsftSkipEx("Test requires IPv4 connectivity") 197 198 def require_v6(self): 199 if not self.v6 or not self.remote_v6: 200 raise KsftSkipEx("Test requires IPv6 connectivity") 201 202 def _require_cmd(self, comm, key, host=None): 203 cached = self._required_cmd.get(comm, {}) 204 if cached.get(key) is None: 205 cached[key] = cmd("command -v -- " + comm, fail=False, 206 shell=True, host=host).ret == 0 207 self._required_cmd[comm] = cached 208 return cached[key] 209 210 def require_cmd(self, comm, local=True, remote=False): 211 if local: 212 if not self._require_cmd(comm, "local"): 213 raise KsftSkipEx("Test requires command: " + comm) 214 if remote: 215 if not self._require_cmd(comm, "remote"): 216 raise KsftSkipEx("Test requires (remote) command: " + comm) 217