xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision f2ad904e923f70a80f478febf001f88dfd65a64c)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import ksft_setup
8from lib.py import cmd, ethtool, ip, CmdExitFailure
9from lib.py import NetNS, NetdevSimDev
10from .remote import Remote
11
12
13def _load_env_file(src_path):
14    env = os.environ.copy()
15
16    src_dir = Path(src_path).parent.resolve()
17    if not (src_dir / "net.config").exists():
18        return ksft_setup(env)
19
20    with open((src_dir / "net.config").as_posix(), 'r') as fp:
21        for line in fp.readlines():
22            full_file = line
23            # Strip comments
24            pos = line.find("#")
25            if pos >= 0:
26                line = line[:pos]
27            line = line.strip()
28            if not line:
29                continue
30            pair = line.split('=', maxsplit=1)
31            if len(pair) != 2:
32                raise Exception("Can't parse configuration line:", full_file)
33            env[pair[0]] = pair[1]
34    return ksft_setup(env)
35
36
37class NetDrvEnv:
38    """
39    Class for a single NIC / host env, with no remote end
40    """
41    def __init__(self, src_path, **kwargs):
42        self._ns = None
43
44        self.env = _load_env_file(src_path)
45
46        if 'NETIF' in self.env:
47            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
48        else:
49            self._ns = NetdevSimDev(**kwargs)
50            self.dev = self._ns.nsims[0].dev
51        self.ifname = self.dev['ifname']
52        self.ifindex = self.dev['ifindex']
53
54    def __enter__(self):
55        ip(f"link set dev {self.dev['ifname']} up")
56
57        return self
58
59    def __exit__(self, ex_type, ex_value, ex_tb):
60        """
61        __exit__ gets called at the end of a "with" block.
62        """
63        self.__del__()
64
65    def __del__(self):
66        if self._ns:
67            self._ns.remove()
68            self._ns = None
69
70
71class NetDrvEpEnv:
72    """
73    Class for an environment with a local device and "remote endpoint"
74    which can be used to send traffic in.
75
76    For local testing it creates two network namespaces and a pair
77    of netdevsim devices.
78    """
79
80    # Network prefixes used for local tests
81    nsim_v4_pfx = "192.0.2."
82    nsim_v6_pfx = "2001:db8::"
83
84    def __init__(self, src_path, nsim_test=None):
85
86        self.env = _load_env_file(src_path)
87
88        self._stats_settle_time = None
89
90        # Things we try to destroy
91        self.remote = None
92        # These are for local testing state
93        self._netns = None
94        self._ns = None
95        self._ns_peer = None
96
97        if "NETIF" in self.env:
98            if nsim_test is True:
99                raise KsftXfailEx("Test only works on netdevsim")
100            self._check_env()
101
102            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
103
104            self.v4 = self.env.get("LOCAL_V4")
105            self.v6 = self.env.get("LOCAL_V6")
106            self.remote_v4 = self.env.get("REMOTE_V4")
107            self.remote_v6 = self.env.get("REMOTE_V6")
108            kind = self.env["REMOTE_TYPE"]
109            args = self.env["REMOTE_ARGS"]
110        else:
111            if nsim_test is False:
112                raise KsftXfailEx("Test does not work on netdevsim")
113
114            self.create_local()
115
116            self.dev = self._ns.nsims[0].dev
117
118            self.v4 = self.nsim_v4_pfx + "1"
119            self.v6 = self.nsim_v6_pfx + "1"
120            self.remote_v4 = self.nsim_v4_pfx + "2"
121            self.remote_v6 = self.nsim_v6_pfx + "2"
122            kind = "netns"
123            args = self._netns.name
124
125        self.remote = Remote(kind, args, src_path)
126
127        self.addr = self.v6 if self.v6 else self.v4
128        self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
129
130        self.addr_ipver = "6" if self.v6 else "4"
131        # Bracketed addresses, some commands need IPv6 to be inside []
132        self.baddr = f"[{self.v6}]" if self.v6 else self.v4
133        self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
134
135        self.ifname = self.dev['ifname']
136        self.ifindex = self.dev['ifindex']
137
138        self._required_cmd = {}
139
140    def create_local(self):
141        self._netns = NetNS()
142        self._ns = NetdevSimDev()
143        self._ns_peer = NetdevSimDev(ns=self._netns)
144
145        with open("/proc/self/ns/net") as nsfd0, \
146             open("/var/run/netns/" + self._netns.name) as nsfd1:
147            ifi0 = self._ns.nsims[0].ifindex
148            ifi1 = self._ns_peer.nsims[0].ifindex
149            NetdevSimDev.ctrl_write('link_device',
150                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
151
152        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
153        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
154        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
155
156        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
157        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
158        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
159
160    def _check_env(self):
161        vars_needed = [
162            ["LOCAL_V4", "LOCAL_V6"],
163            ["REMOTE_V4", "REMOTE_V6"],
164            ["REMOTE_TYPE"],
165            ["REMOTE_ARGS"]
166        ]
167        missing = []
168
169        for choice in vars_needed:
170            for entry in choice:
171                if entry in self.env:
172                    break
173            else:
174                missing.append(choice)
175        # Make sure v4 / v6 configs are symmetric
176        if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
177            missing.append(["LOCAL_V6", "REMOTE_V6"])
178        if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
179            missing.append(["LOCAL_V4", "REMOTE_V4"])
180        if missing:
181            raise Exception("Invalid environment, missing configuration:", missing,
182                            "Please see tools/testing/selftests/drivers/net/README.rst")
183
184    def __enter__(self):
185        return self
186
187    def __exit__(self, ex_type, ex_value, ex_tb):
188        """
189        __exit__ gets called at the end of a "with" block.
190        """
191        self.__del__()
192
193    def __del__(self):
194        if self._ns:
195            self._ns.remove()
196            self._ns = None
197        if self._ns_peer:
198            self._ns_peer.remove()
199            self._ns_peer = None
200        if self._netns:
201            del self._netns
202            self._netns = None
203        if self.remote:
204            del self.remote
205            self.remote = None
206
207    def require_v4(self):
208        if not self.v4 or not self.remote_v4:
209            raise KsftSkipEx("Test requires IPv4 connectivity")
210
211    def require_v6(self):
212        if not self.v6 or not self.remote_v6:
213            raise KsftSkipEx("Test requires IPv6 connectivity")
214
215    def _require_cmd(self, comm, key, host=None):
216        cached = self._required_cmd.get(comm, {})
217        if cached.get(key) is None:
218            cached[key] = cmd("command -v -- " + comm, fail=False,
219                              shell=True, host=host).ret == 0
220        self._required_cmd[comm] = cached
221        return cached[key]
222
223    def require_cmd(self, comm, local=True, remote=False):
224        if local:
225            if not self._require_cmd(comm, "local"):
226                raise KsftSkipEx("Test requires command: " + comm)
227        if remote:
228            if not self._require_cmd(comm, "remote"):
229                raise KsftSkipEx("Test requires (remote) command: " + comm)
230
231    def wait_hw_stats_settle(self):
232        """
233        Wait for HW stats to become consistent, some devices DMA HW stats
234        periodically so events won't be reflected until next sync.
235        Good drivers will tell us via ethtool what their sync period is.
236        """
237        if self._stats_settle_time is None:
238            data = {}
239            try:
240                data = ethtool("-c " + self.ifname, json=True)[0]
241            except CmdExitFailure as e:
242                if "Operation not supported" not in e.cmd.stderr:
243                    raise
244
245            self._stats_settle_time = 0.025 + \
246                data.get('stats-block-usecs', 0) / 1000 / 1000
247
248        time.sleep(self._stats_settle_time)
249