xref: /freebsd/tests/sys/netinet/ip_mroute.py (revision 09e702ad40af0067017613070b42d72cbc2bec3a)
1#
2# Copyright (c) 2025 Stormshield
3#
4# SPDX-License-Identifier: BSD-2-Clause
5#
6
7import pytest
8import socket
9import struct
10import subprocess
11import time
12from pathlib import Path
13
14from atf_python.sys.net.vnet import VnetTestTemplate
15
16
17class MRouteTestTemplate(VnetTestTemplate):
18    """
19    Helper class for multicast routing tests.  Test classes should inherit from this one.
20    """
21    COORD_SOCK = "coord.sock"
22
23    @staticmethod
24    def _msgwait(sock: socket.socket, expected: bytes):
25        msg = sock.recv(1024)
26        assert msg == expected
27
28    @staticmethod
29    def sendmsg(msg: bytes, path: str):
30        s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
31        s.sendto(msg, path)
32        s.close()
33
34    @staticmethod
35    def _makesock(path: str):
36        s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
37        s.bind(path)
38        return s
39
40    @staticmethod
41    def mcast_join_INET6(addr: str, port: int):
42        pass
43
44    def jointest(self, vnet):
45        """Let the coordinator know that we're ready, and wait for go-ahead."""
46        coord = self._makesock(vnet.alias + ".sock")
47        self.sendmsg(b"ok " + vnet.alias.encode(), self.COORD_SOCK)
48        self._msgwait(coord, b"join")
49
50    def donetest(self):
51        """Let the coordinator that we completed successfully."""
52        self.sendmsg(b"done", self.COORD_SOCK)
53
54    def starttest(self, vnets: list[str]):
55        self.vnets = vnets
56        for vnet in vnets:
57            self.sendmsg(b"join", vnet + ".sock")
58
59    def waittest(self):
60        for vnet in self.vnets:
61            self._msgwait(self.coord, b"done")
62
63    def setup_method(self, method):
64        self.coord = self._makesock(self.COORD_SOCK)
65        super().setup_method(method)
66
67        # Loop until all other hosts have sent the ok message.
68        received = set()
69        vnet_names = set(self.vnet_map.keys()) - {self.vnet.alias}
70        while len(received) < len(vnet_names):
71            msg = self.coord.recv(1024)
72            received.add(msg)
73        assert received == {b"ok " + name.encode() for name in vnet_names}
74
75
76class MRouteINETTestTemplate(MRouteTestTemplate):
77    @staticmethod
78    def run_pimd(ident: str, ifaces: list[str], rpaddr: str, group: str, fib=0):
79        conf = f"pimd-{ident}.conf"
80        with open(conf, "w") as conf_file:
81            conf_file.write("no phyint\n")
82            for iface in ifaces:
83                conf_file.write(f"phyint {iface} enable\n")
84            conf_file.write(f"rp-address {rpaddr} {group}\n")
85
86        cmd = f"setfib {fib} pimd -i {ident} -f {conf} -p pimd-{ident}.pid -n"
87        return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL,
88                                stderr=subprocess.DEVNULL)
89
90    @staticmethod
91    def mcast_join(addr: str, port: int):
92        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
93        mreq = struct.pack("4si", socket.inet_aton(addr), socket.INADDR_ANY)
94        s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
95        s.bind((addr, port))
96        time.sleep(1)  # Give the kernel a bit of time to join the group.
97        return s
98
99    @staticmethod
100    def mcast_sendto(addr: str, port: int, iface: str, msg: bytes):
101        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
102        mreqn = struct.pack("iii", socket.INADDR_ANY, socket.INADDR_ANY,
103                            socket.if_nametoindex(iface))
104        s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, mreqn)
105        s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64)
106        s.sendto(msg, (addr, port))
107        s.close()
108
109    def setup_method(self, method):
110        self.require_module("ip_mroute")
111        super().setup_method(method)
112
113
114class MRouteINET6TestTemplate(MRouteTestTemplate):
115    @staticmethod
116    def run_ip6_mrouted(ident: str, ifaces: list[str], fib=0):
117        ifaces_str = ' '.join(f"-i {iface}" for iface in ifaces)
118        exepath = Path(__file__).parent / "ip6_mrouted"
119        cmd = f"setfib {fib} {exepath} {ifaces_str}"
120        return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL,
121                                stderr=subprocess.DEVNULL)
122
123    @staticmethod
124    def mcast_join(addr: str, port: int, iface: str):
125        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
126        mreq = struct.pack("16si", socket.inet_pton(socket.AF_INET6, addr),
127                            socket.if_nametoindex(iface))
128        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
129        s.bind((addr, port))
130        time.sleep(1)  # Give the kernel a bit of time to join the
131        return s
132
133    @staticmethod
134    def mcast_sendto(addr: str, port: int, iface: str, msg: bytes):
135        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
136        mreq = struct.pack("i", socket.if_nametoindex(iface))
137        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, mreq)
138        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 64)
139        s.sendto(msg, (addr, port))
140        s.close()
141
142    def setup_method(self, method):
143        self.require_module("ip6_mroute")
144        super().setup_method(method)
145
146
147class Test1RBasicINET(MRouteINETTestTemplate):
148    """Basic multicast routing setup with 2 hosts connected via a router."""
149
150    TOPOLOGY = {
151        "vnet_router": {"ifaces": ["if1", "if2"]},
152        "vnet_host1": {"ifaces": ["if1"]},
153        "vnet_host2": {"ifaces": ["if2"]},
154        "if1": {"prefixes4": [("192.168.1.1/24", "192.168.1.2/24")]},
155        "if2": {"prefixes4": [("192.168.2.1/24", "192.168.2.2/24")]},
156    }
157    MULTICAST_ADDR = "239.0.0.1"
158
159    def setup_method(self, method):
160        # Create VNETs and start the handlers.
161        super().setup_method(method)
162
163        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
164        self.pimd = self.run_pimd("test", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32")
165        time.sleep(3)  # Give pimd a bit of time to get itself together.
166
167    def vnet_host1_handler(self, vnet):
168        self.jointest(vnet)
169
170        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
171
172        # Wait for host 2 to send a message, then send a reply.
173        self._msgwait(self.sock, b"Hello, Multicast!")
174        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
175                          b"Goodbye, Multicast!")
176        self._msgwait(self.sock, b"Goodbye, Multicast!")
177        self.donetest()
178
179    def vnet_host2_handler(self, vnet):
180        self.jointest(vnet)
181
182        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
183
184        # Send a message to host 1, then wait for a reply.
185        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
186                          b"Hello, Multicast!")
187        self._msgwait(self.sock, b"Hello, Multicast!")
188        self._msgwait(self.sock, b"Goodbye, Multicast!")
189        self.donetest()
190
191    @pytest.mark.require_user("root")
192    @pytest.mark.require_progs(["pimd"])
193    @pytest.mark.timeout(30)
194    def test(self):
195        self.starttest(["vnet_host1", "vnet_host2"])
196        self.waittest()
197
198
199class Test1RCrissCrossINET(MRouteINETTestTemplate):
200    """
201    Test a router connected to four hosts, with pairs of interfaces
202    in different FIBs.
203    """
204
205    TOPOLOGY = {
206        "vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]},
207        "vnet_host1": {"ifaces": ["if1"]},
208        "vnet_host2": {"ifaces": ["if2"]},
209        "vnet_host3": {"ifaces": ["if3"]},
210        "vnet_host4": {"ifaces": ["if4"]},
211        "if1": {
212            "prefixes4": [("192.168.1.1/24", "192.168.1.2/24")],
213            "prefixes6": [],
214            "fib": (0, 0),
215        },
216        "if2": {
217            "prefixes4": [("192.168.2.1/24", "192.168.2.2/24")],
218            "prefixes6": [],
219            "fib": (0, 0),
220        },
221        "if3": {
222            "prefixes4": [("192.168.3.1/24", "192.168.3.2/24")],
223            "prefixes6": [],
224            "fib": (1, 0),
225        },
226        "if4": {
227            "prefixes4": [("192.168.4.1/24", "192.168.4.2/24")],
228            "prefixes6": [],
229            "fib": (1, 0),
230        },
231    }
232    MULTICAST_ADDR = "239.0.0.1"
233
234    def setup_method(self, method):
235        # Create VNETs and start the handlers.
236        super().setup_method(method)
237
238        # Start a pimd instance per FIB.
239        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
240        self.pimd0 = self.run_pimd("test0", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32",
241                                   fib=0)
242        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]]
243        self.pimd1 = self.run_pimd("test1", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32",
244                                   fib=1)
245        time.sleep(3)  # Give pimd a bit of time to get itself together.
246
247    def vnet_host1_handler(self, vnet):
248        self.jointest(vnet)
249
250        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
251        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
252        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
253                          b"Goodbye, Multicast on FIB 0!")
254        self.donetest()
255
256    def vnet_host2_handler(self, vnet):
257        self.jointest(vnet)
258        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
259        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
260                          b"Hello, Multicast on FIB 0!")
261        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
262        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
263        self.donetest()
264
265    def vnet_host3_handler(self, vnet):
266        self.jointest(vnet)
267        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
268        self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
269        self.mcast_sendto(self.MULTICAST_ADDR, 12345,
270                          vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!")
271        self.donetest()
272
273    def vnet_host4_handler(self, vnet):
274        self.jointest(vnet)
275        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
276        time.sleep(1)
277        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
278                          b"Hello, Multicast on FIB 1!")
279        self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
280        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!")
281        self.donetest()
282
283    @pytest.mark.require_user("root")
284    @pytest.mark.require_progs(["pimd"])
285    @pytest.mark.timeout(30)
286    def test(self):
287        self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
288        self.waittest()
289
290
291class Test1RBasicINET6(MRouteINET6TestTemplate):
292    """Basic multicast routing setup with 2 hosts connected via a router."""
293
294    TOPOLOGY = {
295        "vnet_router": {"ifaces": ["if1", "if2"]},
296        "vnet_host1": {"ifaces": ["if1"]},
297        "vnet_host2": {"ifaces": ["if2"]},
298        "if1": {
299            "prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")]
300        },
301        "if2": {
302            "prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")]
303        },
304    }
305    MULTICAST_ADDR = "ff05::1"
306
307    def setup_method(self, method):
308        # Create VNETs and start the handlers.
309        super().setup_method(method)
310
311        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
312        self.mrouted = self.run_ip6_mrouted("test", ifaces)
313        time.sleep(1)
314
315    def vnet_host1_handler(self, vnet):
316        self.jointest(vnet)
317
318        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
319
320        # Wait for host 2 to send a message, then send a reply.
321        self._msgwait(self.sock, b"Hello, Multicast!")
322        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
323                          b"Goodbye, Multicast!")
324        self._msgwait(self.sock, b"Goodbye, Multicast!")
325        self.donetest()
326
327    def vnet_host2_handler(self, vnet):
328        self.jointest(vnet)
329
330        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
331
332        # Send a message to host 1, then wait for a reply.
333        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
334                          b"Hello, Multicast!")
335        self._msgwait(self.sock, b"Hello, Multicast!")
336        self._msgwait(self.sock, b"Goodbye, Multicast!")
337        self.donetest()
338
339    @pytest.mark.require_user("root")
340    @pytest.mark.timeout(30)
341    def test(self):
342        self.starttest(["vnet_host1", "vnet_host2"])
343        self.waittest()
344
345
346class Test1RCrissCrossINET6(MRouteINET6TestTemplate):
347    """
348    Test a router connected to four hosts, with pairs of interfaces
349    in different FIBs.
350    """
351
352    TOPOLOGY = {
353        "vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]},
354        "vnet_host1": {"ifaces": ["if1"]},
355        "vnet_host2": {"ifaces": ["if2"]},
356        "vnet_host3": {"ifaces": ["if3"]},
357        "vnet_host4": {"ifaces": ["if4"]},
358        "if1": {
359            "prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")],
360            "fib": (0, 0),
361        },
362        "if2": {
363            "prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")],
364            "fib": (0, 0),
365        },
366        "if3": {
367            "prefixes6": [("2001:db8:0:3::1/64", "2001:db8:0:3::2/64")],
368            "fib": (1, 0),
369        },
370        "if4": {
371            "prefixes6": [("2001:db8:0:4::1/64", "2001:db8:0:4::2/64")],
372            "fib": (1, 0),
373        },
374    }
375    MULTICAST_ADDR = "ff05::1"
376
377    def setup_method(self, method):
378        # Create VNETs and start the handlers.
379        super().setup_method(method)
380
381        # Start an ip6_mrouted instance per FIB.
382        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
383        self.pimd0 = self.run_ip6_mrouted("test0", ifaces, fib=0)
384        ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]]
385        self.pimd1 = self.run_ip6_mrouted("test1", ifaces, fib=1)
386        time.sleep(1)  # Give ip6_mrouted a bit of time to get itself together.
387
388    def vnet_host1_handler(self, vnet):
389        self.jointest(vnet)
390
391        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
392        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
393        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
394                          b"Goodbye, Multicast on FIB 0!")
395        self.donetest()
396
397    def vnet_host2_handler(self, vnet):
398        self.jointest(vnet)
399        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
400        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
401                          b"Hello, Multicast on FIB 0!")
402        self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
403        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
404        self.donetest()
405
406    def vnet_host3_handler(self, vnet):
407        self.jointest(vnet)
408        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
409        self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
410        self.mcast_sendto(self.MULTICAST_ADDR, 12345,
411                          vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!")
412        self.donetest()
413
414    def vnet_host4_handler(self, vnet):
415        self.jointest(vnet)
416        self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
417        time.sleep(1)
418        self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
419                          b"Hello, Multicast on FIB 1!")
420        self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
421        self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!")
422        self.donetest()
423
424    @pytest.mark.require_user("root")
425    @pytest.mark.timeout(30)
426    def test(self):
427        self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
428        self.waittest()
429