xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision cbc7afffc5ec581d3781c49fe9c8e8c661e5217b)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4from pathlib import Path
5from lib.py import KsftSkipEx, KsftXfailEx
6from lib.py import cmd, ip
7from lib.py import NetNS, NetdevSimDev
8from .remote import Remote
9
10
11def _load_env_file(src_path):
12    env = os.environ.copy()
13
14    src_dir = Path(src_path).parent.resolve()
15    if not (src_dir / "net.config").exists():
16        return env
17
18    with open((src_dir / "net.config").as_posix(), 'r') as fp:
19        for line in fp.readlines():
20            full_file = line
21            # Strip comments
22            pos = line.find("#")
23            if pos >= 0:
24                line = line[:pos]
25            line = line.strip()
26            if not line:
27                continue
28            pair = line.split('=', maxsplit=1)
29            if len(pair) != 2:
30                raise Exception("Can't parse configuration line:", full_file)
31            env[pair[0]] = pair[1]
32    return env
33
34
35class NetDrvEnv:
36    """
37    Class for a single NIC / host env, with no remote end
38    """
39    def __init__(self, src_path):
40        self._ns = None
41
42        self.env = _load_env_file(src_path)
43
44        if 'NETIF' in self.env:
45            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
46        else:
47            self._ns = NetdevSimDev()
48            self.dev = self._ns.nsims[0].dev
49        self.ifindex = self.dev['ifindex']
50
51    def __enter__(self):
52        return self
53
54    def __exit__(self, ex_type, ex_value, ex_tb):
55        """
56        __exit__ gets called at the end of a "with" block.
57        """
58        self.__del__()
59
60    def __del__(self):
61        if self._ns:
62            self._ns.remove()
63            self._ns = None
64
65
66class NetDrvEpEnv:
67    """
68    Class for an environment with a local device and "remote endpoint"
69    which can be used to send traffic in.
70
71    For local testing it creates two network namespaces and a pair
72    of netdevsim devices.
73    """
74
75    # Network prefixes used for local tests
76    nsim_v4_pfx = "192.0.2."
77    nsim_v6_pfx = "2001:db8::"
78
79    def __init__(self, src_path, nsim_test=None):
80
81        self.env = _load_env_file(src_path)
82
83        # Things we try to destroy
84        self.remote = None
85        # These are for local testing state
86        self._netns = None
87        self._ns = None
88        self._ns_peer = None
89
90        if "NETIF" in self.env:
91            if nsim_test is True:
92                raise KsftXfailEx("Test only works on netdevsim")
93            self._check_env()
94
95            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
96
97            self.v4 = self.env.get("LOCAL_V4")
98            self.v6 = self.env.get("LOCAL_V6")
99            self.remote_v4 = self.env.get("REMOTE_V4")
100            self.remote_v6 = self.env.get("REMOTE_V6")
101            kind = self.env["REMOTE_TYPE"]
102            args = self.env["REMOTE_ARGS"]
103        else:
104            if nsim_test is False:
105                raise KsftXfailEx("Test does not work on netdevsim")
106
107            self.create_local()
108
109            self.dev = self._ns.nsims[0].dev
110
111            self.v4 = self.nsim_v4_pfx + "1"
112            self.v6 = self.nsim_v6_pfx + "1"
113            self.remote_v4 = self.nsim_v4_pfx + "2"
114            self.remote_v6 = self.nsim_v6_pfx + "2"
115            kind = "netns"
116            args = self._netns.name
117
118        self.remote = Remote(kind, args, src_path)
119
120        self.addr = self.v6 if self.v6 else self.v4
121        self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
122
123        self.addr_ipver = "6" if self.v6 else "4"
124        # Bracketed addresses, some commands need IPv6 to be inside []
125        self.baddr = f"[{self.v6}]" if self.v6 else self.v4
126        self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
127
128        self.ifname = self.dev['ifname']
129        self.ifindex = self.dev['ifindex']
130
131        self._required_cmd = {}
132
133    def create_local(self):
134        self._netns = NetNS()
135        self._ns = NetdevSimDev()
136        self._ns_peer = NetdevSimDev(ns=self._netns)
137
138        with open("/proc/self/ns/net") as nsfd0, \
139             open("/var/run/netns/" + self._netns.name) as nsfd1:
140            ifi0 = self._ns.nsims[0].ifindex
141            ifi1 = self._ns_peer.nsims[0].ifindex
142            NetdevSimDev.ctrl_write('link_device',
143                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
144
145        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
146        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
147        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
148
149        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
150        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
151        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
152
153    def _check_env(self):
154        vars_needed = [
155            ["LOCAL_V4", "LOCAL_V6"],
156            ["REMOTE_V4", "REMOTE_V6"],
157            ["REMOTE_TYPE"],
158            ["REMOTE_ARGS"]
159        ]
160        missing = []
161
162        for choice in vars_needed:
163            for entry in choice:
164                if entry in self.env:
165                    break
166            else:
167                missing.append(choice)
168        # Make sure v4 / v6 configs are symmetric
169        if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
170            missing.append(["LOCAL_V6", "REMOTE_V6"])
171        if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
172            missing.append(["LOCAL_V4", "REMOTE_V4"])
173        if missing:
174            raise Exception("Invalid environment, missing configuration:", missing,
175                            "Please see tools/testing/selftests/drivers/net/README.rst")
176
177    def __enter__(self):
178        return self
179
180    def __exit__(self, ex_type, ex_value, ex_tb):
181        """
182        __exit__ gets called at the end of a "with" block.
183        """
184        self.__del__()
185
186    def __del__(self):
187        if self._ns:
188            self._ns.remove()
189            self._ns = None
190        if self._ns_peer:
191            self._ns_peer.remove()
192            self._ns_peer = None
193        if self._netns:
194            del self._netns
195            self._netns = None
196        if self.remote:
197            del self.remote
198            self.remote = None
199
200    def require_v4(self):
201        if not self.v4 or not self.remote_v4:
202            raise KsftSkipEx("Test requires IPv4 connectivity")
203
204    def require_v6(self):
205        if not self.v6 or not self.remote_v6:
206            raise KsftSkipEx("Test requires IPv6 connectivity")
207
208    def _require_cmd(self, comm, key, host=None):
209        cached = self._required_cmd.get(comm, {})
210        if cached.get(key) is None:
211            cached[key] = cmd("command -v -- " + comm, fail=False,
212                              shell=True, host=host).ret == 0
213        self._required_cmd[comm] = cached
214        return cached[key]
215
216    def require_cmd(self, comm, local=True, remote=False):
217        if local:
218            if not self._require_cmd(comm, "local"):
219                raise KsftSkipEx("Test requires command: " + comm)
220        if remote:
221            if not self._require_cmd(comm, "remote"):
222                raise KsftSkipEx("Test requires (remote) command: " + comm)
223