# SPDX-License-Identifier: GPL-2.0 import os import time from pathlib import Path from lib.py import KsftSkipEx, KsftXfailEx from lib.py import ksft_setup from lib.py import cmd, ethtool, ip from lib.py import NetNS, NetdevSimDev from .remote import Remote def _load_env_file(src_path): env = os.environ.copy() src_dir = Path(src_path).parent.resolve() if not (src_dir / "net.config").exists(): return ksft_setup(env) with open((src_dir / "net.config").as_posix(), 'r') as fp: for line in fp.readlines(): full_file = line # Strip comments pos = line.find("#") if pos >= 0: line = line[:pos] line = line.strip() if not line: continue pair = line.split('=', maxsplit=1) if len(pair) != 2: raise Exception("Can't parse configuration line:", full_file) env[pair[0]] = pair[1] return ksft_setup(env) class NetDrvEnv: """ Class for a single NIC / host env, with no remote end """ def __init__(self, src_path, **kwargs): self._ns = None self.env = _load_env_file(src_path) if 'NETIF' in self.env: self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] else: self._ns = NetdevSimDev(**kwargs) self.dev = self._ns.nsims[0].dev self.ifindex = self.dev['ifindex'] def __enter__(self): ip(f"link set dev {self.dev['ifname']} up") return self def __exit__(self, ex_type, ex_value, ex_tb): """ __exit__ gets called at the end of a "with" block. """ self.__del__() def __del__(self): if self._ns: self._ns.remove() self._ns = None class NetDrvEpEnv: """ Class for an environment with a local device and "remote endpoint" which can be used to send traffic in. For local testing it creates two network namespaces and a pair of netdevsim devices. """ # Network prefixes used for local tests nsim_v4_pfx = "192.0.2." nsim_v6_pfx = "2001:db8::" def __init__(self, src_path, nsim_test=None): self.env = _load_env_file(src_path) self._stats_settle_time = None # Things we try to destroy self.remote = None # These are for local testing state self._netns = None self._ns = None self._ns_peer = None if "NETIF" in self.env: if nsim_test is True: raise KsftXfailEx("Test only works on netdevsim") self._check_env() self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0] self.v4 = self.env.get("LOCAL_V4") self.v6 = self.env.get("LOCAL_V6") self.remote_v4 = self.env.get("REMOTE_V4") self.remote_v6 = self.env.get("REMOTE_V6") kind = self.env["REMOTE_TYPE"] args = self.env["REMOTE_ARGS"] else: if nsim_test is False: raise KsftXfailEx("Test does not work on netdevsim") self.create_local() self.dev = self._ns.nsims[0].dev self.v4 = self.nsim_v4_pfx + "1" self.v6 = self.nsim_v6_pfx + "1" self.remote_v4 = self.nsim_v4_pfx + "2" self.remote_v6 = self.nsim_v6_pfx + "2" kind = "netns" args = self._netns.name self.remote = Remote(kind, args, src_path) self.addr = self.v6 if self.v6 else self.v4 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4 self.addr_ipver = "6" if self.v6 else "4" # Bracketed addresses, some commands need IPv6 to be inside [] self.baddr = f"[{self.v6}]" if self.v6 else self.v4 self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4 self.ifname = self.dev['ifname'] self.ifindex = self.dev['ifindex'] self._required_cmd = {} def create_local(self): self._netns = NetNS() self._ns = NetdevSimDev() self._ns_peer = NetdevSimDev(ns=self._netns) with open("/proc/self/ns/net") as nsfd0, \ open("/var/run/netns/" + self._netns.name) as nsfd1: ifi0 = self._ns.nsims[0].ifindex ifi1 = self._ns_peer.nsims[0].ifindex NetdevSimDev.ctrl_write('link_device', f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}') ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24") ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad") ip(f" link set dev {self._ns.nsims[0].ifname} up") ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns) ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns) ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns) def _check_env(self): vars_needed = [ ["LOCAL_V4", "LOCAL_V6"], ["REMOTE_V4", "REMOTE_V6"], ["REMOTE_TYPE"], ["REMOTE_ARGS"] ] missing = [] for choice in vars_needed: for entry in choice: if entry in self.env: break else: missing.append(choice) # Make sure v4 / v6 configs are symmetric if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env): missing.append(["LOCAL_V6", "REMOTE_V6"]) if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env): missing.append(["LOCAL_V4", "REMOTE_V4"]) if missing: raise Exception("Invalid environment, missing configuration:", missing, "Please see tools/testing/selftests/drivers/net/README.rst") def __enter__(self): return self def __exit__(self, ex_type, ex_value, ex_tb): """ __exit__ gets called at the end of a "with" block. """ self.__del__() def __del__(self): if self._ns: self._ns.remove() self._ns = None if self._ns_peer: self._ns_peer.remove() self._ns_peer = None if self._netns: del self._netns self._netns = None if self.remote: del self.remote self.remote = None def require_v4(self): if not self.v4 or not self.remote_v4: raise KsftSkipEx("Test requires IPv4 connectivity") def require_v6(self): if not self.v6 or not self.remote_v6: raise KsftSkipEx("Test requires IPv6 connectivity") def _require_cmd(self, comm, key, host=None): cached = self._required_cmd.get(comm, {}) if cached.get(key) is None: cached[key] = cmd("command -v -- " + comm, fail=False, shell=True, host=host).ret == 0 self._required_cmd[comm] = cached return cached[key] def require_cmd(self, comm, local=True, remote=False): if local: if not self._require_cmd(comm, "local"): raise KsftSkipEx("Test requires command: " + comm) if remote: if not self._require_cmd(comm, "remote"): raise KsftSkipEx("Test requires (remote) command: " + comm) def wait_hw_stats_settle(self): """ Wait for HW stats to become consistent, some devices DMA HW stats periodically so events won't be reflected until next sync. Good drivers will tell us via ethtool what their sync period is. """ if self._stats_settle_time is None: data = ethtool("-c " + self.ifname, json=True)[0] self._stats_settle_time = 0.025 + \ data.get('stats-block-usecs', 0) / 1000 / 1000 time.sleep(self._stats_settle_time)