Lines Matching full:self
52 def __init__(self, iface_alias: str, iface_name: str): argument
53 self.name = iface_name
54 self.alias = iface_alias
55 self.vnet_name = ""
56 self.jailed = False
57 self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}}
58 self.prefixes4: List[List[str]] = []
59 self.prefixes6: List[List[str]] = []
61 self.iftype = self.IFT_LOOP
63 self.iftype = self.IFT_ETHER
66 def ifindex(self): argument
67 return socket.if_nametoindex(self.name)
70 def first_ipv6(self): argument
71 d = self.addr_map["inet6"]
75 def first_ipv4(self): argument
76 d = self.addr_map["inet"]
79 def set_vnet(self, vnet_name: str): argument
80 self.vnet_name = vnet_name
82 def set_jailed(self, jailed: bool): argument
83 self.jailed = jailed
85 def run_cmd(self, cmd, verbose=False): argument
86 if self.vnet_name and not self.jailed:
87 cmd = "/usr/sbin/jexec {} {}".format(self.vnet_name, cmd)
107 def setup_addr(self, _addr: str): argument
111 cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
114 if self.addr_map[family]:
115 cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr)
117 cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
118 self.run_cmd(cmd)
119 self.addr_map[family][str(addr.ip)] = addr
121 def delete_addr(self, _addr: str): argument
125 cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr)
128 cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr)
129 self.run_cmd(cmd)
130 del self.addr_map[family][str(addr)]
132 def turn_up(self): argument
133 cmd = "/sbin/ifconfig {} up".format(self.name)
134 self.run_cmd(cmd)
136 def enable_ipv6(self): argument
137 cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name)
138 self.run_cmd(cmd)
140 def has_tentative(self) -> bool: argument
142 cmd = "/sbin/ifconfig {} inet6".format(self.name)
143 out = self.run_cmd(cmd, verbose=False)
154 def __init__(self): argument
155 self.file_name = self.INTERFACES_FNAME
157 def _register_iface(self, iface_name: str): argument
158 with open(self.file_name, "a") as f:
161 def _list_ifaces(self) -> List[str]: argument
164 with open(self.file_name, "r") as f:
171 def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]: argument
174 if not self.is_autodeleted(iface.name):
175 self._register_iface(iface.name)
185 def cleanup_vnet_interfaces(self, vnet_name: str) -> List[str]: argument
191 if not self.is_autodeleted(iface_name):
192 if iface_name not in self._list_ifaces():
199 def cleanup(self): argument
201 os.unlink(self.INTERFACES_FNAME)
208 self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface] argument
210 self.name = vnet_name
211 self.alias = vnet_alias # reference in the test topology
212 self.jid = jid
213 self.ifaces = ifaces
214 self.iface_alias_map = {} # iface.alias: iface
215 self.iface_map = {} # iface.name: iface
219 self.iface_alias_map[iface.alias] = iface
220 self.iface_map[iface.name] = iface
222 setattr(self, iface.alias, iface)
223 self.need_dad = False # Disable duplicate address detection by default
224 self.attached = False
225 self.pipe = None
226 self.subprocess = None
228 def run_vnet_cmd(self, cmd, verbose=True): argument
229 if not self.attached:
230 cmd = "/usr/sbin/jexec {} {}".format(self.name, cmd)
233 def disable_dad(self): argument
234 self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0")
236 def set_pipe(self, pipe): argument
237 self.pipe = pipe
239 def set_subprocess(self, p): argument
240 self.subprocess = p
248 def attach(self): argument
249 self.attach_jid(self.jid)
250 self.attached = True
256 def __init__(self, topology_id: str): argument
257 self.topology_id = topology_id
258 self.file_name = self.JAILS_FNAME
259 self._vnets: List[str] = []
261 def _register_vnet(self, vnet_name: str): argument
262 self._vnets.append(vnet_name)
263 with open(self.file_name, "a") as f:
281 def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]): argument
282 vnet_name = "pytest:{}".format(convert_test_name(self.topology_id))
283 if self._vnets:
285 vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1)
297 self._register_vnet(vnet_name)
302 not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces])
309 def cleanup(self): argument
312 with open(self.file_name) as f:
317 os.unlink(self.JAILS_FNAME)
330 topo_map: Dict # self.TOPOLOGY
337 def _require_default_modules(self): argument
339 self.require_module("if_epair")
341 def _get_vnet_handler(self, vnet_alias: str): argument
343 return getattr(self, handler_name, None)
345 def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe): argument
380 handler = self._get_vnet_handler(vnet.alias)
385 self.drop_privileges()
388 def _get_topo_ifmap(self, topo: Dict): argument
405 def setup_topology(self, topo: Dict, topology_id: str): argument
409 iface_map = self._get_topo_ifmap(topo)
422 setattr(self, obj_name, vnet)
427 handler = self._get_vnet_handler(vnet.alias)
455 def setup_method(self, _method): argument
458 self._require_default_modules()
461 topology_id = get_topology_id(self.test_id)
462 topology = self.TOPOLOGY
464 obj_map = self.setup_topology(topology, topology_id)
467 if self._get_vnet_handler(vnet_alias):
471 target=self._setup_vnet,
487 self.vnet = main_vnet
488 self._setup_vnet(main_vnet, obj_map, None)
490 self.iface_map = obj_map.iface_map
491 self.vnet_map = obj_map.vnet_map
492 self.drop_privileges()
494 def cleanup(self, test_id: str): argument
496 topology_id = get_topology_id(self.test_id)
503 def wait_object(self, pipe, timeout=5): argument
508 def wait_objects_any(self, pipe_list, timeout=5): argument
514 def send_object(self, pipe, obj): argument
517 def wait(self): argument
522 def curvnet(self): argument
531 def _setup_default_topology(self): argument
535 "if1": {"type": self.IFTYPE, "prefixes4": [], "prefixes6": []},
538 for prefix in self.IPV6_PREFIXES:
540 for prefix in self.IPV4_PREFIXES:
544 def setup_method(self, method): argument
545 if not getattr(self, "TOPOLOGY", None):
546 self.TOPOLOGY = self._setup_default_topology()
548 names = self.TOPOLOGY.keys()