xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision 72c19df24a3e352b5eefd6657cf8d4051ccf77c5)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import shlex
5from pathlib import Path
6from lib.py import KsftSkipEx
7from lib.py import cmd, ip
8from lib.py import NetNS, NetdevSimDev
9from .remote import Remote
10
11
12def _load_env_file(src_path):
13    env = os.environ.copy()
14
15    src_dir = Path(src_path).parent.resolve()
16    if not (src_dir / "net.config").exists():
17        return env
18
19    lexer = shlex.shlex(open((src_dir / "net.config").as_posix(), 'r').read())
20    k = None
21    for token in lexer:
22        if k is None:
23            k = token
24            env[k] = ""
25        elif token == "=":
26            pass
27        else:
28            env[k] = token
29            k = None
30    return env
31
32
33class NetDrvEnv:
34    """
35    Class for a single NIC / host env, with no remote end
36    """
37    def __init__(self, src_path):
38        self._ns = None
39
40        self.env = _load_env_file(src_path)
41
42        if 'NETIF' in self.env:
43            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
44        else:
45            self._ns = NetdevSimDev()
46            self.dev = self._ns.nsims[0].dev
47        self.ifindex = self.dev['ifindex']
48
49    def __enter__(self):
50        return self
51
52    def __exit__(self, ex_type, ex_value, ex_tb):
53        """
54        __exit__ gets called at the end of a "with" block.
55        """
56        self.__del__()
57
58    def __del__(self):
59        if self._ns:
60            self._ns.remove()
61            self._ns = None
62
63
64class NetDrvEpEnv:
65    """
66    Class for an environment with a local device and "remote endpoint"
67    which can be used to send traffic in.
68
69    For local testing it creates two network namespaces and a pair
70    of netdevsim devices.
71    """
72
73    # Network prefixes used for local tests
74    nsim_v4_pfx = "192.0.2."
75    nsim_v6_pfx = "2001:db8::"
76
77    def __init__(self, src_path):
78
79        self.env = _load_env_file(src_path)
80
81        # Things we try to destroy
82        self.remote = None
83        # These are for local testing state
84        self._netns = None
85        self._ns = None
86        self._ns_peer = None
87
88        if "NETIF" in self.env:
89            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
90
91            self.v4 = self.env.get("LOCAL_V4")
92            self.v6 = self.env.get("LOCAL_V6")
93            self.remote_v4 = self.env.get("REMOTE_V4")
94            self.remote_v6 = self.env.get("REMOTE_V6")
95            kind = self.env["REMOTE_TYPE"]
96            args = self.env["REMOTE_ARGS"]
97        else:
98            self.create_local()
99
100            self.dev = self._ns.nsims[0].dev
101
102            self.v4 = self.nsim_v4_pfx + "1"
103            self.v6 = self.nsim_v6_pfx + "1"
104            self.remote_v4 = self.nsim_v4_pfx + "2"
105            self.remote_v6 = self.nsim_v6_pfx + "2"
106            kind = "netns"
107            args = self._netns.name
108
109        self.remote = Remote(kind, args, src_path)
110
111        self.addr = self.v6 if self.v6 else self.v4
112        self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
113
114        self.addr_ipver = "6" if self.v6 else "4"
115        # Bracketed addresses, some commands need IPv6 to be inside []
116        self.baddr = f"[{self.v6}]" if self.v6 else self.v4
117        self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
118
119        self.ifname = self.dev['ifname']
120        self.ifindex = self.dev['ifindex']
121
122        self._required_cmd = {}
123
124    def create_local(self):
125        self._netns = NetNS()
126        self._ns = NetdevSimDev()
127        self._ns_peer = NetdevSimDev(ns=self._netns)
128
129        with open("/proc/self/ns/net") as nsfd0, \
130             open("/var/run/netns/" + self._netns.name) as nsfd1:
131            ifi0 = self._ns.nsims[0].ifindex
132            ifi1 = self._ns_peer.nsims[0].ifindex
133            NetdevSimDev.ctrl_write('link_device',
134                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
135
136        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
137        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
138        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
139
140        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
141        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
142        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
143
144    def __enter__(self):
145        return self
146
147    def __exit__(self, ex_type, ex_value, ex_tb):
148        """
149        __exit__ gets called at the end of a "with" block.
150        """
151        self.__del__()
152
153    def __del__(self):
154        if self._ns:
155            self._ns.remove()
156            self._ns = None
157        if self._ns_peer:
158            self._ns_peer.remove()
159            self._ns_peer = None
160        if self._netns:
161            del self._netns
162            self._netns = None
163        if self.remote:
164            del self.remote
165            self.remote = None
166
167    def require_v4(self):
168        if not self.v4 or not self.remote_v4:
169            raise KsftSkipEx("Test requires IPv4 connectivity")
170
171    def require_v6(self):
172        if not self.v6 or not self.remote_v6:
173            raise KsftSkipEx("Test requires IPv6 connectivity")
174
175    def _require_cmd(self, comm, key, host=None):
176        cached = self._required_cmd.get(comm, {})
177        if cached.get(key) is None:
178            cached[key] = cmd("command -v -- " + comm, fail=False,
179                              shell=True, host=host).ret == 0
180        self._required_cmd[comm] = cached
181        return cached[key]
182
183    def require_cmd(self, comm, local=True, remote=False):
184        if local:
185            if not self._require_cmd(comm, "local"):
186                raise KsftSkipEx("Test requires command: " + comm)
187        if remote:
188            if not self._require_cmd(comm, "remote"):
189                raise KsftSkipEx("Test requires (remote) command: " + comm)
190