Lines Matching refs:self
21 def __init__(self, src_path):
22 self.src_path = Path(src_path)
23 self.test_dir = self.src_path.parent.resolve()
24 self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
26 self.env = self._load_env_file()
28 def _load_env_file(self):
31 src_dir = Path(self.src_path).parent.resolve()
56 def __init__(self, src_path, nsim_test=None, **kwargs):
59 self._ns = None
61 if 'NETIF' in self.env:
65 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
70 self._ns = NetdevSimDev(**kwargs)
71 self.dev = self._ns.nsims[0].dev
72 self.ifname = self.dev['ifname']
73 self.ifindex = self.dev['ifindex']
75 def __enter__(self):
76 ip(f"link set dev {self.dev['ifname']} up")
78 return self
80 def __exit__(self, ex_type, ex_value, ex_tb):
84 self.__del__()
86 def __del__(self):
87 if self._ns:
88 self._ns.remove()
89 self._ns = None
105 def __init__(self, src_path, nsim_test=None):
108 self._stats_settle_time = None
111 self.remote = None
113 self._netns = None
114 self._ns = None
115 self._ns_peer = None
117 self.addr_v = { "4": None, "6": None }
118 self.remote_addr_v = { "4": None, "6": None }
120 if "NETIF" in self.env:
123 self._check_env()
125 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
127 self.addr_v["4"] = self.env.get("LOCAL_V4")
128 self.addr_v["6"] = self.env.get("LOCAL_V6")
129 self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
130 self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
131 kind = self.env["REMOTE_TYPE"]
132 args = self.env["REMOTE_ARGS"]
137 self.create_local()
139 self.dev = self._ns.nsims[0].dev
141 self.addr_v["4"] = self.nsim_v4_pfx + "1"
142 self.addr_v["6"] = self.nsim_v6_pfx + "1"
143 self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
144 self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
146 args = self._netns.name
148 self.remote = Remote(kind, args, src_path)
150 self.addr_ipver = "6" if self.addr_v["6"] else "4"
151 self.addr = self.addr_v[self.addr_ipver]
152 self.remote_addr = self.remote_addr_v[self.addr_ipver]
155 self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
156 self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
158 self.ifname = self.dev['ifname']
159 self.ifindex = self.dev['ifindex']
162 self.remote_ifname = self.resolve_remote_ifc()
164 self._required_cmd = {}
166 def create_local(self):
167 self._netns = NetNS()
168 self._ns = NetdevSimDev()
169 self._ns_peer = NetdevSimDev(ns=self._netns)
171 with open("/proc/self/ns/net") as nsfd0, \
172 open("/var/run/netns/" + self._netns.name) as nsfd1:
173 ifi0 = self._ns.nsims[0].ifindex
174 ifi1 = self._ns_peer.nsims[0].ifindex
178 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
179 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
180 ip(f" link set dev {self._ns.nsims[0].ifname} up")
182 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
183 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
184 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
186 def _check_env(self):
197 if entry in self.env:
202 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
204 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
210 def resolve_remote_ifc(self):
212 if self.remote_addr_v["4"]:
213 v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
214 if self.remote_addr_v["6"]:
215 v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
222 def __enter__(self):
223 return self
225 def __exit__(self, ex_type, ex_value, ex_tb):
229 self.__del__()
231 def __del__(self):
232 if self._ns:
233 self._ns.remove()
234 self._ns = None
235 if self._ns_peer:
236 self._ns_peer.remove()
237 self._ns_peer = None
238 if self._netns:
239 del self._netns
240 self._netns = None
241 if self.remote:
242 del self.remote
243 self.remote = None
245 def require_ipver(self, ipver):
246 if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
249 def _require_cmd(self, comm, key, host=None):
250 cached = self._required_cmd.get(comm, {})
254 self._required_cmd[comm] = cached
257 def require_cmd(self, comm, local=True, remote=False):
259 if not self._require_cmd(comm, "local"):
262 if not self._require_cmd(comm, "remote"):
265 def wait_hw_stats_settle(self):
271 if self._stats_settle_time is None:
274 data = ethtool("-c " + self.ifname, json=True)[0]
279 self._stats_settle_time = 0.025 + \
282 time.sleep(self._stats_settle_time)