1# 2# Copyright (c) 2025 Stormshield 3# 4# SPDX-License-Identifier: BSD-2-Clause 5# 6 7import pytest 8import socket 9import struct 10import subprocess 11import time 12from pathlib import Path 13 14from atf_python.sys.net.vnet import VnetTestTemplate 15 16 17class MRouteTestTemplate(VnetTestTemplate): 18 """ 19 Helper class for multicast routing tests. Test classes should inherit from this one. 20 """ 21 COORD_SOCK = "coord.sock" 22 23 @staticmethod 24 def _msgwait(sock: socket.socket, expected: bytes, timeout=None): 25 if timeout is not None: 26 sock.settimeout(timeout) 27 msg = sock.recv(1024) 28 assert msg == expected 29 30 @staticmethod 31 def sendmsg(msg: bytes, path: str): 32 s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 33 s.sendto(msg, path) 34 s.close() 35 36 @staticmethod 37 def _makesock(path: str): 38 s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 39 s.bind(path) 40 return s 41 42 @staticmethod 43 def mcast_join_INET6(addr: str, port: int): 44 pass 45 46 def jointest(self, vnet): 47 """Let the coordinator know that we're ready, and wait for go-ahead.""" 48 coord = self._makesock(vnet.alias + ".sock") 49 self.sendmsg(b"ok " + vnet.alias.encode(), self.COORD_SOCK) 50 self._msgwait(coord, b"join") 51 52 def donetest(self): 53 """Let the coordinator that we completed successfully.""" 54 self.sendmsg(b"done", self.COORD_SOCK) 55 56 def starttest(self, vnets: list[str]): 57 self.vnets = vnets 58 for vnet in vnets: 59 self.sendmsg(b"join", vnet + ".sock") 60 61 def waittest(self): 62 for vnet in self.vnets: 63 self._msgwait(self.coord, b"done") 64 65 def setup_method(self, method): 66 self.coord = self._makesock(self.COORD_SOCK) 67 super().setup_method(method) 68 69 # Loop until all other hosts have sent the ok message. 70 received = set() 71 vnet_names = set(self.vnet_map.keys()) - {self.vnet.alias} 72 while len(received) < len(vnet_names): 73 msg = self.coord.recv(1024) 74 received.add(msg) 75 assert received == {b"ok " + name.encode() for name in vnet_names} 76 77 78class MRouteINETTestTemplate(MRouteTestTemplate): 79 @staticmethod 80 def run_pimd(ident: str, ifaces: list[str], rpaddr: str, group: str, fib=0): 81 conf = f"pimd-{ident}.conf" 82 with open(conf, "w") as conf_file: 83 conf_file.write("no phyint\n") 84 for iface in ifaces: 85 conf_file.write(f"phyint {iface} enable\n") 86 conf_file.write(f"rp-address {rpaddr} {group}\n") 87 88 cmd = f"setfib {fib} pimd -i {ident} -f {conf} -p pimd-{ident}.pid -n" 89 return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL, 90 stderr=subprocess.DEVNULL) 91 92 @staticmethod 93 def mcast_join(addr: str, port: int): 94 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 95 mreq = struct.pack("4si", socket.inet_aton(addr), socket.INADDR_ANY) 96 s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 97 s.bind((addr, port)) 98 time.sleep(1) # Give the kernel a bit of time to join the group. 99 return s 100 101 @staticmethod 102 def mcast_sendto(addr: str, port: int, iface: str, msg: bytes): 103 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 104 mreqn = struct.pack("iii", socket.INADDR_ANY, socket.INADDR_ANY, 105 socket.if_nametoindex(iface)) 106 s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, mreqn) 107 s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64) 108 s.sendto(msg, (addr, port)) 109 s.close() 110 111 def setup_method(self, method): 112 self.require_module("ip_mroute") 113 super().setup_method(method) 114 115 116class MRouteINET6TestTemplate(MRouteTestTemplate): 117 @staticmethod 118 def run_ip6_mrouted(ident: str, ifaces: list[str], fib=0): 119 ifaces_str = ' '.join(f"-i {iface}" for iface in ifaces) 120 exepath = Path(__file__).parent / "ip6_mrouted" 121 cmd = f"setfib {fib} {exepath} {ifaces_str}" 122 return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL, 123 stderr=subprocess.DEVNULL) 124 125 @staticmethod 126 def mcast_join(addr: str, port: int, iface: str): 127 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 128 mreq = struct.pack("16si", socket.inet_pton(socket.AF_INET6, addr), 129 socket.if_nametoindex(iface)) 130 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) 131 s.bind((addr, port)) 132 time.sleep(1) # Give the kernel a bit of time to join the 133 return s 134 135 @staticmethod 136 def mcast_sendto(addr: str, port: int, iface: str, msg: bytes): 137 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 138 mreq = struct.pack("i", socket.if_nametoindex(iface)) 139 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, mreq) 140 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 64) 141 s.sendto(msg, (addr, port)) 142 s.close() 143 144 def setup_method(self, method): 145 self.require_module("ip6_mroute") 146 super().setup_method(method) 147 148 149class Test1RBasicINET(MRouteINETTestTemplate): 150 """Basic multicast routing setup with 2 hosts connected via a router.""" 151 152 TOPOLOGY = { 153 "vnet_router": {"ifaces": ["if1", "if2"]}, 154 "vnet_host1": {"ifaces": ["if1"]}, 155 "vnet_host2": {"ifaces": ["if2"]}, 156 "if1": {"prefixes4": [("192.168.1.1/24", "192.168.1.2/24")]}, 157 "if2": {"prefixes4": [("192.168.2.1/24", "192.168.2.2/24")]}, 158 } 159 MULTICAST_ADDR = "239.0.0.1" 160 161 def setup_method(self, method): 162 # Create VNETs and start the handlers. 163 super().setup_method(method) 164 165 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 166 self.pimd = self.run_pimd("test", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32") 167 time.sleep(3) # Give pimd a bit of time to get itself together. 168 169 def vnet_host1_handler(self, vnet): 170 self.jointest(vnet) 171 172 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 173 174 # Wait for host 2 to send a message, then send a reply. 175 self._msgwait(self.sock, b"Hello, Multicast!") 176 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 177 b"Goodbye, Multicast!") 178 self._msgwait(self.sock, b"Goodbye, Multicast!") 179 self.donetest() 180 181 def vnet_host2_handler(self, vnet): 182 self.jointest(vnet) 183 184 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 185 186 # Send a message to host 1, then wait for a reply. 187 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 188 b"Hello, Multicast!") 189 self._msgwait(self.sock, b"Hello, Multicast!") 190 self._msgwait(self.sock, b"Goodbye, Multicast!") 191 self.donetest() 192 193 @pytest.mark.require_user("root") 194 @pytest.mark.require_progs(["pimd"]) 195 def test(self): 196 self.starttest(["vnet_host1", "vnet_host2"]) 197 self.waittest() 198 199 200class MRouteINETCrissCrossTestTemplate(MRouteINETTestTemplate): 201 TOPOLOGY = { 202 "vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]}, 203 "vnet_host1": {"ifaces": ["if1"]}, 204 "vnet_host2": {"ifaces": ["if2"]}, 205 "vnet_host3": {"ifaces": ["if3"]}, 206 "vnet_host4": {"ifaces": ["if4"]}, 207 "if1": { 208 "prefixes4": [("192.168.1.1/24", "192.168.1.2/24")], 209 "prefixes6": [], 210 "fib": (0, 0), 211 }, 212 "if2": { 213 "prefixes4": [("192.168.2.1/24", "192.168.2.2/24")], 214 "prefixes6": [], 215 "fib": (0, 0), 216 }, 217 "if3": { 218 "prefixes4": [("192.168.3.1/24", "192.168.3.2/24")], 219 "prefixes6": [], 220 "fib": (1, 0), 221 }, 222 "if4": { 223 "prefixes4": [("192.168.4.1/24", "192.168.4.2/24")], 224 "prefixes6": [], 225 "fib": (1, 0), 226 }, 227 } 228 MULTICAST_ADDR = "239.0.0.1" 229 230 231 232class Test1RCrissCrossINET(MRouteINETCrissCrossTestTemplate): 233 """ 234 Test a router connected to four hosts, with pairs of interfaces 235 in different FIBs. 236 """ 237 238 def setup_method(self, method): 239 # Create VNETs and start the handlers. 240 super().setup_method(method) 241 242 # Start a pimd instance per FIB. 243 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 244 self.pimd0 = self.run_pimd("test0", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32", 245 fib=0) 246 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]] 247 self.pimd1 = self.run_pimd("test1", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32", 248 fib=1) 249 time.sleep(3) # Give pimd a bit of time to get itself together. 250 251 def vnet_host1_handler(self, vnet): 252 self.jointest(vnet) 253 254 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 255 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 256 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 257 b"Goodbye, Multicast on FIB 0!") 258 self.donetest() 259 260 def vnet_host2_handler(self, vnet): 261 self.jointest(vnet) 262 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 263 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 264 b"Hello, Multicast on FIB 0!") 265 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 266 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!") 267 self.donetest() 268 269 def vnet_host3_handler(self, vnet): 270 self.jointest(vnet) 271 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 272 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!") 273 self.mcast_sendto(self.MULTICAST_ADDR, 12345, 274 vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!") 275 self.donetest() 276 277 def vnet_host4_handler(self, vnet): 278 self.jointest(vnet) 279 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 280 time.sleep(1) 281 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 282 b"Hello, Multicast on FIB 1!") 283 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!") 284 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!") 285 self.donetest() 286 287 @pytest.mark.require_user("root") 288 @pytest.mark.require_progs(["pimd"]) 289 def test(self): 290 self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"]) 291 self.waittest() 292 293 294class Test1RCrissCrossINETMissingRouter(MRouteINETCrissCrossTestTemplate): 295 """ 296 Test what happens when a router is configured for some FIBs but not others. 297 """ 298 299 def setup_method(self, method): 300 # Create VNETs and start the handlers. 301 super().setup_method(method) 302 303 # Only start a pimd instance in FIB 0. 304 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 305 self.pimd0 = self.run_pimd("test0", ifaces, "127.0.0.1", 306 self.MULTICAST_ADDR + "/32", fib=0) 307 308 time.sleep(3) # Give pimd a bit of time to get itself together. 309 310 def vnet_host1_handler(self, vnet): 311 self.jointest(vnet) 312 313 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 314 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 315 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 316 b"Goodbye, Multicast on FIB 0!") 317 self.donetest() 318 319 def vnet_host2_handler(self, vnet): 320 self.jointest(vnet) 321 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 322 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 323 b"Hello, Multicast on FIB 0!") 324 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 325 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!") 326 self.donetest() 327 328 def vnet_host3_handler(self, vnet): 329 self.jointest(vnet) 330 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 331 timedout = False 332 try: 333 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!", timeout=5) 334 except socket.timeout: 335 timedout = True 336 assert timedout, "Received a message when we shouldn't have" 337 self.donetest() 338 339 def vnet_host4_handler(self, vnet): 340 self.jointest(vnet) 341 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345) 342 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 343 b"Hello, Multicast on FIB 1!") 344 self.donetest() 345 346 @pytest.mark.require_user("root") 347 @pytest.mark.require_progs(["pimd"]) 348 def test(self): 349 self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"]) 350 self.waittest() 351 352class Test1RBasicINET6(MRouteINET6TestTemplate): 353 """Basic multicast routing setup with 2 hosts connected via a router.""" 354 355 TOPOLOGY = { 356 "vnet_router": {"ifaces": ["if1", "if2"]}, 357 "vnet_host1": {"ifaces": ["if1"]}, 358 "vnet_host2": {"ifaces": ["if2"]}, 359 "if1": { 360 "prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")] 361 }, 362 "if2": { 363 "prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")] 364 }, 365 } 366 MULTICAST_ADDR = "ff05::1" 367 368 def setup_method(self, method): 369 # Create VNETs and start the handlers. 370 super().setup_method(method) 371 372 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 373 self.mrouted = self.run_ip6_mrouted("test", ifaces) 374 time.sleep(1) 375 376 def vnet_host1_handler(self, vnet): 377 self.jointest(vnet) 378 379 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 380 381 # Wait for host 2 to send a message, then send a reply. 382 self._msgwait(self.sock, b"Hello, Multicast!") 383 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 384 b"Goodbye, Multicast!") 385 self._msgwait(self.sock, b"Goodbye, Multicast!") 386 self.donetest() 387 388 def vnet_host2_handler(self, vnet): 389 self.jointest(vnet) 390 391 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 392 393 # Send a message to host 1, then wait for a reply. 394 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 395 b"Hello, Multicast!") 396 self._msgwait(self.sock, b"Hello, Multicast!") 397 self._msgwait(self.sock, b"Goodbye, Multicast!") 398 self.donetest() 399 400 @pytest.mark.require_user("root") 401 def test(self): 402 self.starttest(["vnet_host1", "vnet_host2"]) 403 self.waittest() 404 405 406class MRouteINET6CrissCrossTestTemplate(MRouteINET6TestTemplate): 407 TOPOLOGY = { 408 "vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]}, 409 "vnet_host1": {"ifaces": ["if1"]}, 410 "vnet_host2": {"ifaces": ["if2"]}, 411 "vnet_host3": {"ifaces": ["if3"]}, 412 "vnet_host4": {"ifaces": ["if4"]}, 413 "if1": { 414 "prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")], 415 "fib": (0, 0), 416 }, 417 "if2": { 418 "prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")], 419 "fib": (0, 0), 420 }, 421 "if3": { 422 "prefixes6": [("2001:db8:0:3::1/64", "2001:db8:0:3::2/64")], 423 "fib": (1, 0), 424 }, 425 "if4": { 426 "prefixes6": [("2001:db8:0:4::1/64", "2001:db8:0:4::2/64")], 427 "fib": (1, 0), 428 }, 429 } 430 MULTICAST_ADDR = "ff05::1" 431 432 433class Test1RCrissCrossINET6MissingRouter(MRouteINET6CrissCrossTestTemplate): 434 """ 435 Test what happens when a router is configured for some FIBs but not others. 436 """ 437 438 def setup_method(self, method): 439 # Create VNETs and start the handlers. 440 super().setup_method(method) 441 442 # Only start an ip6_mrouted instance in FIB 0. 443 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 444 self.mrouted0 = self.run_ip6_mrouted("test0", ifaces, fib=0) 445 time.sleep(1) # Give ip6_mrouted a bit of time to get itself together. 446 447 def vnet_host1_handler(self, vnet): 448 self.jointest(vnet) 449 450 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 451 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 452 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 453 b"Goodbye, Multicast on FIB 0!") 454 self.donetest() 455 456 def vnet_host2_handler(self, vnet): 457 self.jointest(vnet) 458 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 459 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 460 b"Hello, Multicast on FIB 0!") 461 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 462 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!") 463 self.donetest() 464 465 def vnet_host3_handler(self, vnet): 466 self.jointest(vnet) 467 468 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, 469 vnet.ifaces[0].name) 470 timedout = False 471 try: 472 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!", timeout=5) 473 except socket.timeout: 474 timedout = True 475 assert timedout, "Received a message when we shouldn't have" 476 self.donetest() 477 478 def vnet_host4_handler(self, vnet): 479 self.jointest(vnet) 480 481 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, 482 vnet.ifaces[0].name) 483 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 484 b"Hello, Multicast on FIB 1!") 485 self.donetest() 486 487 @pytest.mark.require_user("root") 488 def test(self): 489 self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"]) 490 self.waittest() 491 492 493class Test1RCrissCrossINET6(MRouteINET6CrissCrossTestTemplate): 494 """ 495 Test a router connected to four hosts, with pairs of interfaces 496 in different FIBs. 497 """ 498 499 def setup_method(self, method): 500 # Create VNETs and start the handlers. 501 super().setup_method(method) 502 503 # Start an ip6_mrouted instance per FIB. 504 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]] 505 self.pimd0 = self.run_ip6_mrouted("test0", ifaces, fib=0) 506 ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]] 507 self.pimd1 = self.run_ip6_mrouted("test1", ifaces, fib=1) 508 time.sleep(1) # Give ip6_mrouted a bit of time to get itself together. 509 510 def vnet_host1_handler(self, vnet): 511 self.jointest(vnet) 512 513 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 514 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 515 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 516 b"Goodbye, Multicast on FIB 0!") 517 self.donetest() 518 519 def vnet_host2_handler(self, vnet): 520 self.jointest(vnet) 521 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 522 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 523 b"Hello, Multicast on FIB 0!") 524 self._msgwait(self.sock, b"Hello, Multicast on FIB 0!") 525 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!") 526 self.donetest() 527 528 def vnet_host3_handler(self, vnet): 529 self.jointest(vnet) 530 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 531 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!") 532 self.mcast_sendto(self.MULTICAST_ADDR, 12345, 533 vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!") 534 self.donetest() 535 536 def vnet_host4_handler(self, vnet): 537 self.jointest(vnet) 538 self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name) 539 time.sleep(1) 540 self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name, 541 b"Hello, Multicast on FIB 1!") 542 self._msgwait(self.sock, b"Hello, Multicast on FIB 1!") 543 self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!") 544 self.donetest() 545 546 @pytest.mark.require_user("root") 547 def test(self): 548 self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"]) 549 self.waittest() 550