xref: /linux/tools/testing/selftests/drivers/net/lib/py/env.py (revision 1880f272d2f9ef2c65a78e80ede235b3123075fc)
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import shlex
5from pathlib import Path
6from lib.py import ip
7from lib.py import NetNS, NetdevSimDev
8from .remote import Remote
9
10
11def _load_env_file(src_path):
12    env = os.environ.copy()
13
14    src_dir = Path(src_path).parent.resolve()
15    if not (src_dir / "net.config").exists():
16        return env
17
18    lexer = shlex.shlex(open((src_dir / "net.config").as_posix(), 'r').read())
19    k = None
20    for token in lexer:
21        if k is None:
22            k = token
23            env[k] = ""
24        elif token == "=":
25            pass
26        else:
27            env[k] = token
28            k = None
29    return env
30
31
32class NetDrvEnv:
33    """
34    Class for a single NIC / host env, with no remote end
35    """
36    def __init__(self, src_path):
37        self._ns = None
38
39        self.env = _load_env_file(src_path)
40
41        if 'NETIF' in self.env:
42            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
43        else:
44            self._ns = NetdevSimDev()
45            self.dev = self._ns.nsims[0].dev
46        self.ifindex = self.dev['ifindex']
47
48    def __enter__(self):
49        return self
50
51    def __exit__(self, ex_type, ex_value, ex_tb):
52        """
53        __exit__ gets called at the end of a "with" block.
54        """
55        self.__del__()
56
57    def __del__(self):
58        if self._ns:
59            self._ns.remove()
60            self._ns = None
61
62
63class NetDrvEpEnv:
64    """
65    Class for an environment with a local device and "remote endpoint"
66    which can be used to send traffic in.
67
68    For local testing it creates two network namespaces and a pair
69    of netdevsim devices.
70    """
71
72    # Network prefixes used for local tests
73    nsim_v4_pfx = "192.0.2."
74    nsim_v6_pfx = "2001:db8::"
75
76    def __init__(self, src_path):
77
78        self.env = _load_env_file(src_path)
79
80        # Things we try to destroy
81        self.remote = None
82        # These are for local testing state
83        self._netns = None
84        self._ns = None
85        self._ns_peer = None
86
87        if "NETIF" in self.env:
88            self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
89
90            self.v4 = self.env.get("LOCAL_V4")
91            self.v6 = self.env.get("LOCAL_V6")
92            self.remote_v4 = self.env.get("REMOTE_V4")
93            self.remote_v6 = self.env.get("REMOTE_V6")
94            kind = self.env["REMOTE_TYPE"]
95            args = self.env["REMOTE_ARGS"]
96        else:
97            self.create_local()
98
99            self.dev = self._ns.nsims[0].dev
100
101            self.v4 = self.nsim_v4_pfx + "1"
102            self.v6 = self.nsim_v6_pfx + "1"
103            self.remote_v4 = self.nsim_v4_pfx + "2"
104            self.remote_v6 = self.nsim_v6_pfx + "2"
105            kind = "netns"
106            args = self._netns.name
107
108        self.remote = Remote(kind, args, src_path)
109
110        self.addr = self.v6 if self.v6 else self.v4
111        self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
112
113        self.ifname = self.dev['ifname']
114        self.ifindex = self.dev['ifindex']
115
116    def create_local(self):
117        self._netns = NetNS()
118        self._ns = NetdevSimDev()
119        self._ns_peer = NetdevSimDev(ns=self._netns)
120
121        with open("/proc/self/ns/net") as nsfd0, \
122             open("/var/run/netns/" + self._netns.name) as nsfd1:
123            ifi0 = self._ns.nsims[0].ifindex
124            ifi1 = self._ns_peer.nsims[0].ifindex
125            NetdevSimDev.ctrl_write('link_device',
126                                    f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
127
128        ip(f"   addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
129        ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
130        ip(f"   link set dev {self._ns.nsims[0].ifname} up")
131
132        ip(f"   addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
133        ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
134        ip(f"   link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
135
136    def __enter__(self):
137        return self
138
139    def __exit__(self, ex_type, ex_value, ex_tb):
140        """
141        __exit__ gets called at the end of a "with" block.
142        """
143        self.__del__()
144
145    def __del__(self):
146        if self._ns:
147            self._ns.remove()
148            self._ns = None
149        if self._ns_peer:
150            self._ns_peer.remove()
151            self._ns_peer = None
152        if self._netns:
153            del self._netns
154            self._netns = None
155        if self.remote:
156            del self.remote
157            self.remote = None
158