xref: /freebsd/tests/sys/netinet/ip_mroute.py (revision 3b16e96b005c723717aa6a4ebc48000354e64fa1)
1#
2# Copyright (c) 2025 Stormshield
3#
4# SPDX-License-Identifier: BSD-2-Clause
5#
6
7import pytest
8import socket
9import struct
10import subprocess
11import time
12from pathlib import Path
13
14from atf_python.sys.net.vnet import VnetTestTemplate
15
16
17class MRouteTestTemplate(VnetTestTemplate):
18    """
19    Helper class for multicast routing tests.  Test classes should inherit from this one.
20    """
21    COORD_SOCK = "coord.sock"
22
23    @staticmethod
24    def _msgwait(sock: socket.socket, expected: bytes, timeout=None):
25        if timeout is not None:
26            sock.settimeout(timeout)
27        msg = sock.recv(1024)
28        assert msg == expected
29
30    @staticmethod
31    def sendmsg(msg: bytes, path: str):
32        s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
33        s.sendto(msg, path)
34        s.close()
35
36    @staticmethod
37    def _makesock(path: str):
38        s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
39        s.bind(path)
40        return s
41
42    @staticmethod
43    def mcast_join_INET6(addr: str, port: int):
44        pass
45
46    def jointest(self, vnet):
47        """Let the coordinator know that we're ready, and wait for go-ahead."""
48        coord = self._makesock(vnet.alias + ".sock")
49        self.sendmsg(b"ok " + vnet.alias.encode(), self.COORD_SOCK)
50        self._msgwait(coord, b"join")
51
52    def donetest(self):
53        """Let the coordinator that we completed successfully."""
54        self.sendmsg(b"done", self.COORD_SOCK)
55
56    def starttest(self, vnets: list[str]):
57        self.vnets = vnets
58        for vnet in vnets:
59            self.sendmsg(b"join", vnet + ".sock")
60
61    def waittest(self):
62        for vnet in self.vnets:
63            self._msgwait(self.coord, b"done")
64
65    def setup_method(self, method):
66        self.coord = self._makesock(self.COORD_SOCK)
67        super().setup_method(method)
68
69        # Loop until all other hosts have sent the ok message.
70        received = set()
71        vnet_names = set(self.vnet_map.keys()) - {self.vnet.alias}
72        while len(received) < len(vnet_names):
73            msg = self.coord.recv(1024)
74            received.add(msg)
75        assert received == {b"ok " + name.encode() for name in vnet_names}
76
77
78class MRouteINETTestTemplate(MRouteTestTemplate):
79    @staticmethod
80    def run_pimd(ident: str, ifaces: list[str], rpaddr: str, group: str, fib=0):
81        conf = f"pimd-{ident}.conf"
82        with open(conf, "w") as conf_file:
83            conf_file.write("no phyint\n")
84            for iface in ifaces:
85                conf_file.write(f"phyint {iface} enable\n")
86            conf_file.write(f"rp-address {rpaddr} {group}\n")
87
88        cmd = f"setfib {fib} pimd -i {ident} -f {conf} -p pimd-{ident}.pid -n"
89        return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL,
90                                stderr=subprocess.DEVNULL)
91
92    @staticmethod
93    def mcast_join(addr: str, port: int):
94        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
95        mreq = struct.pack("4si", socket.inet_aton(addr), socket.INADDR_ANY)
96        s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
97        s.bind((addr, port))
98        time.sleep(1)  # Give the kernel a bit of time to join the group.
99        return s
100
101    @staticmethod
102    def mcast_sendto(addr: str, port: int, iface: str, msg: bytes):
103        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
104        mreqn = struct.pack("iii", socket.INADDR_ANY, socket.INADDR_ANY,
105                            socket.if_nametoindex(iface))
106        s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, mreqn)
107        s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64)
108        s.sendto(msg, (addr, port))
109        s.close()
110
111    def setup_method(self, method):
112        self.require_module("ip_mroute")
113        super().setup_method(method)
114
115
116class MRouteINET6TestTemplate(MRouteTestTemplate):
117    @staticmethod
118    def run_ip6_mrouted(ident: str, ifaces: list[str], fib=0):
119        ifaces_str = ' '.join(f"-i {iface}" for iface in ifaces)
120        exepath = Path(__file__).parent / "ip6_mrouted"
121        cmd = f"setfib {fib} {exepath} {ifaces_str}"
122        return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL,
123                                stderr=subprocess.DEVNULL)
124
125    @staticmethod
126    def mcast_join(addr: str, port: int, iface: str):
127        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
128        mreq = struct.pack("16si", socket.inet_pton(socket.AF_INET6, addr),
129                            socket.if_nametoindex(iface))
130        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
131        s.bind((addr, port))
132        time.sleep(1)  # Give the kernel a bit of time to join the
133        return s
134
135    @staticmethod
136    def mcast_sendto(addr: str, port: int, iface: str, msg: bytes):
137        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
138        mreq = struct.pack("i", socket.if_nametoindex(iface))
139        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, mreq)
140        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 64)
141        s.sendto(msg, (addr, port))
142        s.close()
143
144    def setup_method(self, method):
145        self.require_module("ip6_mroute")
146        super().setup_method(method)
147
148
149class Test1RBasicINET(MRouteINETTestTemplate):
150    """Basic multicast routing setup with 2 hosts connected via a router."""
151
152    TOPOLOGY = {
153        "vnet_router": {"ifaces": ["if1", "if2"]},
154        "vnet_host1": {"ifaces": ["if1"]},
155        "vnet_host2": {"ifaces": ["if2"]},
156        "if1": {"prefixes4": [("192.168.1.1/24", "192.168.1.2/24")]},
157        "if2": {"prefixes4": [("192.168.2.1/24", "192.168.2.2/24")]},
158    }
159    MULTICAST_ADDR = "239.0.0.1"
160
161    def setup_method(self, method):
162        # Create VNETs and start the handlers.
163        super().setup_method(method)
164
165        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
166        self.pimd = self.run_pimd("test", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32")
167        time.sleep(3)  # Give pimd a bit of time to get itself together.
168
169    def vnet_host1_handler(self, vnet):
170        self.jointest(vnet)
171
172        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
173
174        # Wait for host 2 to send a message, then send a reply.
175        self._msgwait(self.sock, b"Hello, Multicast!")
176        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
177                          b"Goodbye, Multicast!")
178        self._msgwait(self.sock, b"Goodbye, Multicast!")
179        self.donetest()
180
181    def vnet_host2_handler(self, vnet):
182        self.jointest(vnet)
183
184        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
185
186        # Send a message to host 1, then wait for a reply.
187        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
188                          b"Hello, Multicast!")
189        self._msgwait(self.sock, b"Hello, Multicast!")
190        self._msgwait(self.sock, b"Goodbye, Multicast!")
191        self.donetest()
192
193    @pytest.mark.require_user("root")
194    @pytest.mark.require_progs(["pimd"])
195    def test(self):
196        self.starttest(["vnet_host1", "vnet_host2"])
197        self.waittest()
198
199
200class MRouteINETCrissCrossTestTemplate(MRouteINETTestTemplate):
201    TOPOLOGY = {
202        "vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]},
203        "vnet_host1": {"ifaces": ["if1"]},
204        "vnet_host2": {"ifaces": ["if2"]},
205        "vnet_host3": {"ifaces": ["if3"]},
206        "vnet_host4": {"ifaces": ["if4"]},
207        "if1": {
208            "prefixes4": [("192.168.1.1/24", "192.168.1.2/24")],
209            "prefixes6": [],
210            "fib": (0, 0),
211        },
212        "if2": {
213            "prefixes4": [("192.168.2.1/24", "192.168.2.2/24")],
214            "prefixes6": [],
215            "fib": (0, 0),
216        },
217        "if3": {
218            "prefixes4": [("192.168.3.1/24", "192.168.3.2/24")],
219            "prefixes6": [],
220            "fib": (1, 0),
221        },
222        "if4": {
223            "prefixes4": [("192.168.4.1/24", "192.168.4.2/24")],
224            "prefixes6": [],
225            "fib": (1, 0),
226        },
227    }
228    MULTICAST_ADDR = "239.0.0.1"
229
230
231
232class Test1RCrissCrossINET(MRouteINETCrissCrossTestTemplate):
233    """
234    Test a router connected to four hosts, with pairs of interfaces
235    in different FIBs.
236    """
237
238    def setup_method(self, method):
239        # Create VNETs and start the handlers.
240        super().setup_method(method)
241
242        # Start a pimd instance per FIB.
243        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
244        self.pimd0 = self.run_pimd("test0", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32",
245                                   fib=0)
246        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]]
247        self.pimd1 = self.run_pimd("test1", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32",
248                                   fib=1)
249        time.sleep(3)  # Give pimd a bit of time to get itself together.
250
251    def vnet_host1_handler(self, vnet):
252        self.jointest(vnet)
253
254        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
255        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
256        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
257                          b"Goodbye, Multicast on FIB 0!")
258        self.donetest()
259
260    def vnet_host2_handler(self, vnet):
261        self.jointest(vnet)
262        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
263        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
264                          b"Hello, Multicast on FIB 0!")
265        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
266        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
267        self.donetest()
268
269    def vnet_host3_handler(self, vnet):
270        self.jointest(vnet)
271        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
272        self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
273        self.mcast_sendto(self.MULTICAST_ADDR, 12345,
274                          vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!")
275        self.donetest()
276
277    def vnet_host4_handler(self, vnet):
278        self.jointest(vnet)
279        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
280        time.sleep(1)
281        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
282                          b"Hello, Multicast on FIB 1!")
283        self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
284        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!")
285        self.donetest()
286
287    @pytest.mark.require_user("root")
288    @pytest.mark.require_progs(["pimd"])
289    def test(self):
290        self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
291        self.waittest()
292
293
294class Test1RCrissCrossINETMissingRouter(MRouteINETCrissCrossTestTemplate):
295    """
296    Test what happens when a router is configured for some FIBs but not others.
297    """
298
299    def setup_method(self, method):
300        # Create VNETs and start the handlers.
301        super().setup_method(method)
302
303        # Only start a pimd instance in FIB 0.
304        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
305        self.pimd0 = self.run_pimd("test0", ifaces, "127.0.0.1",
306                                   self.MULTICAST_ADDR + "/32", fib=0)
307
308        time.sleep(3)  # Give pimd a bit of time to get itself together.
309
310    def vnet_host1_handler(self, vnet):
311        self.jointest(vnet)
312
313        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
314        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
315        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
316                          b"Goodbye, Multicast on FIB 0!")
317        self.donetest()
318
319    def vnet_host2_handler(self, vnet):
320        self.jointest(vnet)
321        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
322        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
323                          b"Hello, Multicast on FIB 0!")
324        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
325        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
326        self.donetest()
327
328    def vnet_host3_handler(self, vnet):
329        self.jointest(vnet)
330        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
331        timedout = False
332        try:
333            self._msgwait(self.sock, b"Hello, Multicast on FIB 1!", timeout=5)
334        except socket.timeout:
335            timedout = True
336        assert timedout, "Received a message when we shouldn't have"
337        self.donetest()
338
339    def vnet_host4_handler(self, vnet):
340        self.jointest(vnet)
341        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
342        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
343                          b"Hello, Multicast on FIB 1!")
344        self.donetest()
345
346    @pytest.mark.require_user("root")
347    @pytest.mark.require_progs(["pimd"])
348    def test(self):
349        self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
350        self.waittest()
351
352class Test1RBasicINET6(MRouteINET6TestTemplate):
353    """Basic multicast routing setup with 2 hosts connected via a router."""
354
355    TOPOLOGY = {
356        "vnet_router": {"ifaces": ["if1", "if2"]},
357        "vnet_host1": {"ifaces": ["if1"]},
358        "vnet_host2": {"ifaces": ["if2"]},
359        "if1": {
360            "prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")]
361        },
362        "if2": {
363            "prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")]
364        },
365    }
366    MULTICAST_ADDR = "ff05::1"
367
368    def setup_method(self, method):
369        # Create VNETs and start the handlers.
370        super().setup_method(method)
371
372        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
373        self.mrouted = self.run_ip6_mrouted("test", ifaces)
374        time.sleep(1)
375
376    def vnet_host1_handler(self, vnet):
377        self.jointest(vnet)
378
379        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
380
381        # Wait for host 2 to send a message, then send a reply.
382        self._msgwait(self.sock, b"Hello, Multicast!")
383        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
384                          b"Goodbye, Multicast!")
385        self._msgwait(self.sock, b"Goodbye, Multicast!")
386        self.donetest()
387
388    def vnet_host2_handler(self, vnet):
389        self.jointest(vnet)
390
391        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
392
393        # Send a message to host 1, then wait for a reply.
394        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
395                          b"Hello, Multicast!")
396        self._msgwait(self.sock, b"Hello, Multicast!")
397        self._msgwait(self.sock, b"Goodbye, Multicast!")
398        self.donetest()
399
400    @pytest.mark.require_user("root")
401    def test(self):
402        self.starttest(["vnet_host1", "vnet_host2"])
403        self.waittest()
404
405
406class MRouteINET6CrissCrossTestTemplate(MRouteINET6TestTemplate):
407    TOPOLOGY = {
408        "vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]},
409        "vnet_host1": {"ifaces": ["if1"]},
410        "vnet_host2": {"ifaces": ["if2"]},
411        "vnet_host3": {"ifaces": ["if3"]},
412        "vnet_host4": {"ifaces": ["if4"]},
413        "if1": {
414            "prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")],
415            "fib": (0, 0),
416        },
417        "if2": {
418            "prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")],
419            "fib": (0, 0),
420        },
421        "if3": {
422            "prefixes6": [("2001:db8:0:3::1/64", "2001:db8:0:3::2/64")],
423            "fib": (1, 0),
424        },
425        "if4": {
426            "prefixes6": [("2001:db8:0:4::1/64", "2001:db8:0:4::2/64")],
427            "fib": (1, 0),
428        },
429    }
430    MULTICAST_ADDR = "ff05::1"
431
432
433class Test1RCrissCrossINET6MissingRouter(MRouteINET6CrissCrossTestTemplate):
434    """
435    Test what happens when a router is configured for some FIBs but not others.
436    """
437
438    def setup_method(self, method):
439        # Create VNETs and start the handlers.
440        super().setup_method(method)
441
442        # Only start an ip6_mrouted instance in FIB 0.
443        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
444        self.mrouted0 = self.run_ip6_mrouted("test0", ifaces, fib=0)
445        time.sleep(1)  # Give ip6_mrouted a bit of time to get itself together.
446
447    def vnet_host1_handler(self, vnet):
448        self.jointest(vnet)
449
450        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
451        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
452        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
453                          b"Goodbye, Multicast on FIB 0!")
454        self.donetest()
455
456    def vnet_host2_handler(self, vnet):
457        self.jointest(vnet)
458        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
459        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
460                          b"Hello, Multicast on FIB 0!")
461        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
462        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
463        self.donetest()
464
465    def vnet_host3_handler(self, vnet):
466        self.jointest(vnet)
467
468        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345,
469                                    vnet.ifaces[0].name)
470        timedout = False
471        try:
472            self._msgwait(self.sock, b"Hello, Multicast on FIB 1!", timeout=5)
473        except socket.timeout:
474            timedout = True
475        assert timedout, "Received a message when we shouldn't have"
476        self.donetest()
477
478    def vnet_host4_handler(self, vnet):
479        self.jointest(vnet)
480
481        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345,
482                                    vnet.ifaces[0].name)
483        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
484                          b"Hello, Multicast on FIB 1!")
485        self.donetest()
486
487    @pytest.mark.require_user("root")
488    def test(self):
489        self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
490        self.waittest()
491
492
493class Test1RCrissCrossINET6(MRouteINET6CrissCrossTestTemplate):
494    """
495    Test a router connected to four hosts, with pairs of interfaces
496    in different FIBs.
497    """
498
499    def setup_method(self, method):
500        # Create VNETs and start the handlers.
501        super().setup_method(method)
502
503        # Start an ip6_mrouted instance per FIB.
504        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
505        self.pimd0 = self.run_ip6_mrouted("test0", ifaces, fib=0)
506        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]]
507        self.pimd1 = self.run_ip6_mrouted("test1", ifaces, fib=1)
508        time.sleep(1)  # Give ip6_mrouted a bit of time to get itself together.
509
510    def vnet_host1_handler(self, vnet):
511        self.jointest(vnet)
512
513        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
514        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
515        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
516                          b"Goodbye, Multicast on FIB 0!")
517        self.donetest()
518
519    def vnet_host2_handler(self, vnet):
520        self.jointest(vnet)
521        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
522        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
523                          b"Hello, Multicast on FIB 0!")
524        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
525        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
526        self.donetest()
527
528    def vnet_host3_handler(self, vnet):
529        self.jointest(vnet)
530        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
531        self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
532        self.mcast_sendto(self.MULTICAST_ADDR, 12345,
533                          vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!")
534        self.donetest()
535
536    def vnet_host4_handler(self, vnet):
537        self.jointest(vnet)
538        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
539        time.sleep(1)
540        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
541                          b"Hello, Multicast on FIB 1!")
542        self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
543        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!")
544        self.donetest()
545
546    @pytest.mark.require_user("root")
547    def test(self):
548        self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
549        self.waittest()
550