xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision f122668ddcce450c2585f0be4bf4478d6fd6176b)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4from pathlib import Path
5from lib.py import KsftSkipEx
6from lib.py import cmd, ip
7from lib.py import NetNS, NetdevSimDev
8from .remote import Remote
9
10
11def _load_env_file(src_path):
12    env = os.environ.copy()
13
14    src_dir = Path(src_path).parent.resolve()
15    if not (src_dir / "net.config").exists():
16        return env
17
18    with open((src_dir / "net.config").as_posix(), 'r') as fp:
19        for line in fp.readlines():
20            full_file = line
21            # Strip comments
22            pos = line.find("#")
23            if pos >= 0:
24                line = line[:pos]
25            line = line.strip()
26            if not line:
27                continue
28            pair = line.split('=', maxsplit=1)
29            if len(pair) != 2:
30                raise Exception("Can't parse configuration line:", full_file)
31            env[pair[0]] = pair[1]
32    return env
33
34
35class NetDrvEnv:
36    """
37    Class for a single NIC / host env, with no remote end
38    """
39    def __init__(self, src_path):
40        self._ns = None
41
42        self.env = _load_env_file(src_path)
43
44        if 'NETIF' in self.env:
45            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
46        else:
47            self._ns = NetdevSimDev()
48            self.dev = self._ns.nsims[0].dev
49        self.ifindex = self.dev['ifindex']
50
51    def __enter__(self):
52        return self
53
54    def __exit__(self, ex_type, ex_value, ex_tb):
55        """
56        __exit__ gets called at the end of a "with" block.
57        """
58        self.__del__()
59
60    def __del__(self):
61        if self._ns:
62            self._ns.remove()
63            self._ns = None
64
65
66class NetDrvEpEnv:
67    """
68    Class for an environment with a local device and "remote endpoint"
69    which can be used to send traffic in.
70
71    For local testing it creates two network namespaces and a pair
72    of netdevsim devices.
73    """
74
75    # Network prefixes used for local tests
76    nsim_v4_pfx = "192.0.2."
77    nsim_v6_pfx = "2001:db8::"
78
79    def __init__(self, src_path):
80
81        self.env = _load_env_file(src_path)
82
83        # Things we try to destroy
84        self.remote = None
85        # These are for local testing state
86        self._netns = None
87        self._ns = None
88        self._ns_peer = None
89
90        if "NETIF" in self.env:
91            self._check_env()
92            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
93
94            self.v4 = self.env.get("LOCAL_V4")
95            self.v6 = self.env.get("LOCAL_V6")
96            self.remote_v4 = self.env.get("REMOTE_V4")
97            self.remote_v6 = self.env.get("REMOTE_V6")
98            kind = self.env["REMOTE_TYPE"]
99            args = self.env["REMOTE_ARGS"]
100        else:
101            self.create_local()
102
103            self.dev = self._ns.nsims[0].dev
104
105            self.v4 = self.nsim_v4_pfx + "1"
106            self.v6 = self.nsim_v6_pfx + "1"
107            self.remote_v4 = self.nsim_v4_pfx + "2"
108            self.remote_v6 = self.nsim_v6_pfx + "2"
109            kind = "netns"
110            args = self._netns.name
111
112        self.remote = Remote(kind, args, src_path)
113
114        self.addr = self.v6 if self.v6 else self.v4
115        self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
116
117        self.addr_ipver = "6" if self.v6 else "4"
118        # Bracketed addresses, some commands need IPv6 to be inside []
119        self.baddr = f"[{self.v6}]" if self.v6 else self.v4
120        self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
121
122        self.ifname = self.dev['ifname']
123        self.ifindex = self.dev['ifindex']
124
125        self._required_cmd = {}
126
127    def create_local(self):
128        self._netns = NetNS()
129        self._ns = NetdevSimDev()
130        self._ns_peer = NetdevSimDev(ns=self._netns)
131
132        with open("/proc/self/ns/net") as nsfd0, \
133             open("/var/run/netns/" + self._netns.name) as nsfd1:
134            ifi0 = self._ns.nsims[0].ifindex
135            ifi1 = self._ns_peer.nsims[0].ifindex
136            NetdevSimDev.ctrl_write('link_device',
137                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
138
139        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
140        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
141        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
142
143        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
144        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
145        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
146
147    def _check_env(self):
148        vars_needed = [
149            ["LOCAL_V4", "LOCAL_V6"],
150            ["REMOTE_V4", "REMOTE_V6"],
151            ["REMOTE_TYPE"],
152            ["REMOTE_ARGS"]
153        ]
154        missing = []
155
156        for choice in vars_needed:
157            for entry in choice:
158                if entry in self.env:
159                    break
160            else:
161                missing.append(choice)
162        # Make sure v4 / v6 configs are symmetric
163        if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
164            missing.append(["LOCAL_V6", "REMOTE_V6"])
165        if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
166            missing.append(["LOCAL_V4", "REMOTE_V4"])
167        if missing:
168            raise Exception("Invalid environment, missing configuration:", missing,
169                            "Please see tools/testing/selftests/drivers/net/README.rst")
170
171    def __enter__(self):
172        return self
173
174    def __exit__(self, ex_type, ex_value, ex_tb):
175        """
176        __exit__ gets called at the end of a "with" block.
177        """
178        self.__del__()
179
180    def __del__(self):
181        if self._ns:
182            self._ns.remove()
183            self._ns = None
184        if self._ns_peer:
185            self._ns_peer.remove()
186            self._ns_peer = None
187        if self._netns:
188            del self._netns
189            self._netns = None
190        if self.remote:
191            del self.remote
192            self.remote = None
193
194    def require_v4(self):
195        if not self.v4 or not self.remote_v4:
196            raise KsftSkipEx("Test requires IPv4 connectivity")
197
198    def require_v6(self):
199        if not self.v6 or not self.remote_v6:
200            raise KsftSkipEx("Test requires IPv6 connectivity")
201
202    def _require_cmd(self, comm, key, host=None):
203        cached = self._required_cmd.get(comm, {})
204        if cached.get(key) is None:
205            cached[key] = cmd("command -v -- " + comm, fail=False,
206                              shell=True, host=host).ret == 0
207        self._required_cmd[comm] = cached
208        return cached[key]
209
210    def require_cmd(self, comm, local=True, remote=False):
211        if local:
212            if not self._require_cmd(comm, "local"):
213                raise KsftSkipEx("Test requires command: " + comm)
214        if remote:
215            if not self._require_cmd(comm, "remote"):
216                raise KsftSkipEx("Test requires (remote) command: " + comm)
217