1# SPDX-License-Identifier: GPL-2.0 2 3import os 4import shlex 5from pathlib import Path 6from lib.py import KsftSkipEx 7from lib.py import cmd, ip 8from lib.py import NetNS, NetdevSimDev 9from .remote import Remote 10 11 12def _load_env_file(src_path): 13 env = os.environ.copy() 14 15 src_dir = Path(src_path).parent.resolve() 16 if not (src_dir / "net.config").exists(): 17 return env 18 19 lexer = shlex.shlex(open((src_dir / "net.config").as_posix(), 'r').read()) 20 k = None 21 for token in lexer: 22 if k is None: 23 k = token 24 env[k] = "" 25 elif token == "=": 26 pass 27 else: 28 env[k] = token 29 k = None 30 return env 31 32 33class NetDrvEnv: 34 """ 35 Class for a single NIC / host env, with no remote end 36 """ 37 def __init__(self, src_path): 38 self._ns = None 39 40 self.env = _load_env_file(src_path) 41 42 if 'NETIF' in self.env: 43 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 44 else: 45 self._ns = NetdevSimDev() 46 self.dev = self._ns.nsims[0].dev 47 self.ifindex = self.dev['ifindex'] 48 49 def __enter__(self): 50 return self 51 52 def __exit__(self, ex_type, ex_value, ex_tb): 53 """ 54 __exit__ gets called at the end of a "with" block. 55 """ 56 self.__del__() 57 58 def __del__(self): 59 if self._ns: 60 self._ns.remove() 61 self._ns = None 62 63 64class NetDrvEpEnv: 65 """ 66 Class for an environment with a local device and "remote endpoint" 67 which can be used to send traffic in. 68 69 For local testing it creates two network namespaces and a pair 70 of netdevsim devices. 71 """ 72 73 # Network prefixes used for local tests 74 nsim_v4_pfx = "192.0.2." 75 nsim_v6_pfx = "2001:db8::" 76 77 def __init__(self, src_path): 78 79 self.env = _load_env_file(src_path) 80 81 # Things we try to destroy 82 self.remote = None 83 # These are for local testing state 84 self._netns = None 85 self._ns = None 86 self._ns_peer = None 87 88 if "NETIF" in self.env: 89 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] 90 91 self.v4 = self.env.get("LOCAL_V4") 92 self.v6 = self.env.get("LOCAL_V6") 93 self.remote_v4 = self.env.get("REMOTE_V4") 94 self.remote_v6 = self.env.get("REMOTE_V6") 95 kind = self.env["REMOTE_TYPE"] 96 args = self.env["REMOTE_ARGS"] 97 else: 98 self.create_local() 99 100 self.dev = self._ns.nsims[0].dev 101 102 self.v4 = self.nsim_v4_pfx + "1" 103 self.v6 = self.nsim_v6_pfx + "1" 104 self.remote_v4 = self.nsim_v4_pfx + "2" 105 self.remote_v6 = self.nsim_v6_pfx + "2" 106 kind = "netns" 107 args = self._netns.name 108 109 self.remote = Remote(kind, args, src_path) 110 111 self.addr = self.v6 if self.v6 else self.v4 112 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4 113 114 self.addr_ipver = "6" if self.v6 else "4" 115 # Bracketed addresses, some commands need IPv6 to be inside [] 116 self.baddr = f"[{self.v6}]" if self.v6 else self.v4 117 self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4 118 119 self.ifname = self.dev['ifname'] 120 self.ifindex = self.dev['ifindex'] 121 122 self._required_cmd = {} 123 124 def create_local(self): 125 self._netns = NetNS() 126 self._ns = NetdevSimDev() 127 self._ns_peer = NetdevSimDev(ns=self._netns) 128 129 with open("/proc/self/ns/net") as nsfd0, \ 130 open("/var/run/netns/" + self._netns.name) as nsfd1: 131 ifi0 = self._ns.nsims[0].ifindex 132 ifi1 = self._ns_peer.nsims[0].ifindex 133 NetdevSimDev.ctrl_write('link_device', 134 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') 135 136 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") 137 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") 138 ip(f" link set dev {self._ns.nsims[0].ifname} up") 139 140 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) 141 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) 142 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) 143 144 def __enter__(self): 145 return self 146 147 def __exit__(self, ex_type, ex_value, ex_tb): 148 """ 149 __exit__ gets called at the end of a "with" block. 150 """ 151 self.__del__() 152 153 def __del__(self): 154 if self._ns: 155 self._ns.remove() 156 self._ns = None 157 if self._ns_peer: 158 self._ns_peer.remove() 159 self._ns_peer = None 160 if self._netns: 161 del self._netns 162 self._netns = None 163 if self.remote: 164 del self.remote 165 self.remote = None 166 167 def require_v4(self): 168 if not self.v4 or not self.remote_v4: 169 raise KsftSkipEx("Test requires IPv4 connectivity") 170 171 def require_v6(self): 172 if not self.v6 or not self.remote_v6: 173 raise KsftSkipEx("Test requires IPv6 connectivity") 174 175 def _require_cmd(self, comm, key, host=None): 176 cached = self._required_cmd.get(comm, {}) 177 if cached.get(key) is None: 178 cached[key] = cmd("command -v -- " + comm, fail=False, 179 shell=True, host=host).ret == 0 180 self._required_cmd[comm] = cached 181 return cached[key] 182 183 def require_cmd(self, comm, local=True, remote=False): 184 if local: 185 if not self._require_cmd(comm, "local"): 186 raise KsftSkipEx("Test requires command: " + comm) 187 if remote: 188 if not self._require_cmd(comm, "remote"): 189 raise KsftSkipEx("Test requires (remote) command: " + comm) 190