1# 2# Copyright (c) 2025 Stormshield 3# 4# SPDX-License-Identifier: BSD-2-Clause 5# 6 7import pytest 8import socket 9import struct 10import subprocess 11import time 12from pathlib import Path 13 14from atf_python.sys.net.vnet import VnetTestTemplate 15 16 17class MRouteTestTemplate(VnetTestTemplate): 18 """ 19 Helper class for multicast routing tests. Test classes should inherit from this one. 20 """ 21 COORD_SOCK = "coord.sock" 22 23 @staticmethod 24 def _msgwait(sock: socket.socket, expected: bytes): 25 msg = sock.recv(1024) 26 assert msg == expected 27 28 @staticmethod 29 def sendmsg(msg: bytes, path: str): 30 s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 31 s.sendto(msg, path) 32 s.close() 33 34 @staticmethod 35 def _makesock(path: str): 36 s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 37 s.bind(path) 38 return s 39 40 @staticmethod 41 def mcast_join_INET6(addr: str, port: int): 42 pass 43 44 def jointest(self, vnet): 45 """Let the coordinator know that we're ready, and wait for go-ahead.""" 46 coord = self._makesock(vnet.alias + ".sock") 47 self.sendmsg(b"ok " + vnet.alias.encode(), self.COORD_SOCK) 48 self._msgwait(coord, b"join") 49 50 def donetest(self): 51 """Let the coordinator that we completed successfully.""" 52 self.sendmsg(b"done", self.COORD_SOCK) 53 54 def starttest(self, vnets: list[str]): 55 self.vnets = vnets 56 for vnet in vnets: 57 self.sendmsg(b"join", vnet + ".sock") 58 59 def waittest(self): 60 for vnet in self.vnets: 61 self._msgwait(self.coord, b"done") 62 63 def setup_method(self, method): 64 self.coord = self._makesock(self.COORD_SOCK) 65 super().setup_method(method) 66 67 # Loop until all other hosts have sent the ok message. 68 received = set() 69 vnet_names = set(self.vnet_map.keys()) - {self.vnet.alias} 70 while len(received) < len(vnet_names): 71 msg = self.coord.recv(1024) 72 received.add(msg) 73 assert received == {b"ok " + name.encode() for name in vnet_names} 74 75 76class MRouteINETTestTemplate(MRouteTestTemplate): 77 @staticmethod 78 def run_pimd(ident: str, ifaces: list[str], rpaddr: str, group: str, fib=0): 79 conf = f"pimd-{ident}.conf" 80 with open(conf, "w") as conf_file: 81 conf_file.write("no phyint\n") 82 for iface in ifaces: 83 conf_file.write(f"phyint {iface} enable\n") 84 conf_file.write(f"rp-address {rpaddr} {group}\n") 85 86 cmd = f"setfib {fib} pimd -i {ident} -f {conf} -p pimd-{ident}.pid -n" 87 return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL, 88 stderr=subprocess.DEVNULL) 89 90 @staticmethod 91 def mcast_join(addr: str, port: int): 92 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 93 mreq = struct.pack("4si", socket.inet_aton(addr), socket.INADDR_ANY) 94 s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 95 s.bind((addr, port)) 96 time.sleep(1) # Give the kernel a bit of time to join the group. 97 return s 98 99 @staticmethod 100 def mcast_sendto(addr: str, port: int, iface: str, msg: bytes): 101 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 102 mreqn = struct.pack("iii", socket.INADDR_ANY, socket.INADDR_ANY, 103 socket.if_nametoindex(iface)) 104 s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, mreqn) 105 s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64) 106 s.sendto(msg, (addr, port)) 107 s.close() 108 109 def setup_method(self, method): 110 self.require_module("ip_mroute") 111 super().setup_method(method) 112 113 114class MRouteINET6TestTemplate(MRouteTestTemplate): 115 @staticmethod 116 def run_ip6_mrouted(ident: str, ifaces: list[str], fib=0): 117 ifaces_str = ' '.join(f"-i {iface}" for iface in ifaces) 118 exepath = Path(__file__).parent / "ip6_mrouted" 119 cmd = f"setfib {fib} {exepath} {ifaces_str}" 120 return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL, 121 stderr=subprocess.DEVNULL) 122 123 @staticmethod 124 def mcast_join(addr: str, port: int, iface: str): 125 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 126 mreq = struct.pack("16si", socket.inet_pton(socket.AF_INET6, addr), 127 socket.if_nametoindex(iface)) 128 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) 129 s.bind((addr, port)) 130 time.sleep(1) # Give the kernel a bit of time to join the 131 return s 132 133 @staticmethod 134 def mcast_sendto(addr: str, port: int, iface: str, msg: bytes): 135 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 136 mreq = struct.pack("i", socket.if_nametoindex(iface)) 137 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, mreq) 138 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 64) 139 s.sendto(msg, (addr, port)) 140 s.close() 141 142 def setup_method(self, method): 143 self.require_module("ip6_mroute") 144 super().setup_method(method) 145 146 147class Test1RBasicINET(MRouteINETTestTemplate): 148 """Basic multicast routing setup with 2 hosts connected via a router.""" 149 150 TOPOLOGY = { 151 "vnet_router": {"ifaces": ["if1", "if2"]}, 152 "vnet_host1": {"ifaces": ["if1"]}, 153 "vnet_host2": {"ifaces": ["if2"]}, 154 "if1": {"prefixes4": [("192.168.1.1/24", "192.168.1.2/24")]}, 155 "if2": {"prefixes4": [("192.168.2.1/24", "192.168.2.2/24")]}, 156 } 157 MULTICAST_ADDR = "239.0.0.1" 158 159 def setup_method(self, method): 160 # Create VNETs and start the handlers. 161 super().setup_method(method) 162 163 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 164 self.pimd = self.run_pimd("test", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32") 165 time.sleep(3) # Give pimd a bit of time to get itself together. 166 167 def vnet_host1_handler(self, vnet): 168 self.jointest(vnet) 169 170 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 171 172 # Wait for host 2 to send a message, then send a reply. 173 self._msgwait(self.sock, b"Hello, Multicast!") 174 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 175 b"Goodbye, Multicast!") 176 self._msgwait(self.sock, b"Goodbye, Multicast!") 177 self.donetest() 178 179 def vnet_host2_handler(self, vnet): 180 self.jointest(vnet) 181 182 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 183 184 # Send a message to host 1, then wait for a reply. 185 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 186 b"Hello, Multicast!") 187 self._msgwait(self.sock, b"Hello, Multicast!") 188 self._msgwait(self.sock, b"Goodbye, Multicast!") 189 self.donetest() 190 191 @pytest.mark.require_user("root") 192 @pytest.mark.require_progs(["pimd"]) 193 @pytest.mark.timeout(30) 194 def test(self): 195 self.starttest(["vnet_host1", "vnet_host2"]) 196 self.waittest() 197 198 199class Test1RCrissCrossINET(MRouteINETTestTemplate): 200 """ 201 Test a router connected to four hosts, with pairs of interfaces 202 in different FIBs. 203 """ 204 205 TOPOLOGY = { 206 "vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]}, 207 "vnet_host1": {"ifaces": ["if1"]}, 208 "vnet_host2": {"ifaces": ["if2"]}, 209 "vnet_host3": {"ifaces": ["if3"]}, 210 "vnet_host4": {"ifaces": ["if4"]}, 211 "if1": { 212 "prefixes4": [("192.168.1.1/24", "192.168.1.2/24")], 213 "prefixes6": [], 214 "fib": (0, 0), 215 }, 216 "if2": { 217 "prefixes4": [("192.168.2.1/24", "192.168.2.2/24")], 218 "prefixes6": [], 219 "fib": (0, 0), 220 }, 221 "if3": { 222 "prefixes4": [("192.168.3.1/24", "192.168.3.2/24")], 223 "prefixes6": [], 224 "fib": (1, 0), 225 }, 226 "if4": { 227 "prefixes4": [("192.168.4.1/24", "192.168.4.2/24")], 228 "prefixes6": [], 229 "fib": (1, 0), 230 }, 231 } 232 MULTICAST_ADDR = "239.0.0.1" 233 234 def setup_method(self, method): 235 # Create VNETs and start the handlers. 236 super().setup_method(method) 237 238 # Start a pimd instance per FIB. 239 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 240 self.pimd0 = self.run_pimd("test0", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32", 241 fib=0) 242 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]] 243 self.pimd1 = self.run_pimd("test1", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32", 244 fib=1) 245 time.sleep(3) # Give pimd a bit of time to get itself together. 246 247 def vnet_host1_handler(self, vnet): 248 self.jointest(vnet) 249 250 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 251 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 252 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 253 b"Goodbye, Multicast on FIB 0!") 254 self.donetest() 255 256 def vnet_host2_handler(self, vnet): 257 self.jointest(vnet) 258 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 259 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 260 b"Hello, Multicast on FIB 0!") 261 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 262 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!") 263 self.donetest() 264 265 def vnet_host3_handler(self, vnet): 266 self.jointest(vnet) 267 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 268 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!") 269 self.mcast_sendto(self.MULTICAST_ADDR, 12345, 270 vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!") 271 self.donetest() 272 273 def vnet_host4_handler(self, vnet): 274 self.jointest(vnet) 275 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 276 time.sleep(1) 277 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 278 b"Hello, Multicast on FIB 1!") 279 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!") 280 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!") 281 self.donetest() 282 283 @pytest.mark.require_user("root") 284 @pytest.mark.require_progs(["pimd"]) 285 @pytest.mark.timeout(30) 286 def test(self): 287 self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"]) 288 self.waittest() 289 290 291class Test1RBasicINET6(MRouteINET6TestTemplate): 292 """Basic multicast routing setup with 2 hosts connected via a router.""" 293 294 TOPOLOGY = { 295 "vnet_router": {"ifaces": ["if1", "if2"]}, 296 "vnet_host1": {"ifaces": ["if1"]}, 297 "vnet_host2": {"ifaces": ["if2"]}, 298 "if1": { 299 "prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")] 300 }, 301 "if2": { 302 "prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")] 303 }, 304 } 305 MULTICAST_ADDR = "ff05::1" 306 307 def setup_method(self, method): 308 # Create VNETs and start the handlers. 309 super().setup_method(method) 310 311 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 312 self.mrouted = self.run_ip6_mrouted("test", ifaces) 313 time.sleep(1) 314 315 def vnet_host1_handler(self, vnet): 316 self.jointest(vnet) 317 318 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 319 320 # Wait for host 2 to send a message, then send a reply. 321 self._msgwait(self.sock, b"Hello, Multicast!") 322 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 323 b"Goodbye, Multicast!") 324 self._msgwait(self.sock, b"Goodbye, Multicast!") 325 self.donetest() 326 327 def vnet_host2_handler(self, vnet): 328 self.jointest(vnet) 329 330 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 331 332 # Send a message to host 1, then wait for a reply. 333 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 334 b"Hello, Multicast!") 335 self._msgwait(self.sock, b"Hello, Multicast!") 336 self._msgwait(self.sock, b"Goodbye, Multicast!") 337 self.donetest() 338 339 @pytest.mark.require_user("root") 340 @pytest.mark.timeout(30) 341 def test(self): 342 self.starttest(["vnet_host1", "vnet_host2"]) 343 self.waittest() 344 345 346class Test1RCrissCrossINET6(MRouteINET6TestTemplate): 347 """ 348 Test a router connected to four hosts, with pairs of interfaces 349 in different FIBs. 350 """ 351 352 TOPOLOGY = { 353 "vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]}, 354 "vnet_host1": {"ifaces": ["if1"]}, 355 "vnet_host2": {"ifaces": ["if2"]}, 356 "vnet_host3": {"ifaces": ["if3"]}, 357 "vnet_host4": {"ifaces": ["if4"]}, 358 "if1": { 359 "prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")], 360 "fib": (0, 0), 361 }, 362 "if2": { 363 "prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")], 364 "fib": (0, 0), 365 }, 366 "if3": { 367 "prefixes6": [("2001:db8:0:3::1/64", "2001:db8:0:3::2/64")], 368 "fib": (1, 0), 369 }, 370 "if4": { 371 "prefixes6": [("2001:db8:0:4::1/64", "2001:db8:0:4::2/64")], 372 "fib": (1, 0), 373 }, 374 } 375 MULTICAST_ADDR = "ff05::1" 376 377 def setup_method(self, method): 378 # Create VNETs and start the handlers. 379 super().setup_method(method) 380 381 # Start an ip6_mrouted instance per FIB. 382 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 383 self.pimd0 = self.run_ip6_mrouted("test0", ifaces, fib=0) 384 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]] 385 self.pimd1 = self.run_ip6_mrouted("test1", ifaces, fib=1) 386 time.sleep(1) # Give ip6_mrouted a bit of time to get itself together. 387 388 def vnet_host1_handler(self, vnet): 389 self.jointest(vnet) 390 391 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 392 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 393 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 394 b"Goodbye, Multicast on FIB 0!") 395 self.donetest() 396 397 def vnet_host2_handler(self, vnet): 398 self.jointest(vnet) 399 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 400 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 401 b"Hello, Multicast on FIB 0!") 402 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 403 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!") 404 self.donetest() 405 406 def vnet_host3_handler(self, vnet): 407 self.jointest(vnet) 408 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 409 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!") 410 self.mcast_sendto(self.MULTICAST_ADDR, 12345, 411 vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!") 412 self.donetest() 413 414 def vnet_host4_handler(self, vnet): 415 self.jointest(vnet) 416 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 417 time.sleep(1) 418 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 419 b"Hello, Multicast on FIB 1!") 420 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!") 421 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!") 422 self.donetest() 423 424 @pytest.mark.require_user("root") 425 @pytest.mark.timeout(30) 426 def test(self): 427 self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"]) 428 self.waittest() 429