xref: /freebsd/tests/sys/netpfil/pf/sctp.py (revision 6f9ddb329b07099e392c78b6e4fef1c6252de0dc)
1import pytest
2import ctypes
3import socket
4import ipaddress
5import re
6from atf_python.sys.net.tools import ToolsHelper
7from atf_python.sys.net.vnet import VnetTestTemplate
8
9import time
10
11SCTP_UNORDERED = 0x0400
12
13SCTP_NODELAY                 = 0x00000004
14SCTP_SET_PEER_PRIMARY_ADDR   = 0x00000006
15SCTP_PRIMARY_ADDR            = 0x00000007
16
17SCTP_BINDX_ADD_ADDR          = 0x00008001
18SCTP_BINDX_REM_ADDR          = 0x00008002
19
20class sockaddr_in(ctypes.Structure):
21    _fields_ = [
22        ('sin_len', ctypes.c_uint8),
23        ('sin_family', ctypes.c_uint8),
24        ('sin_port', ctypes.c_uint16),
25        ('sin_addr', ctypes.c_uint32),
26        ('sin_zero', ctypes.c_int8 * 8)
27    ]
28
29class sockaddr_in6(ctypes.Structure):
30    _fields_ = [
31        ('sin6_len',      ctypes.c_uint8),
32        ('sin6_family',   ctypes.c_uint8),
33        ('sin6_port',     ctypes.c_uint16),
34        ('sin6_flowinfo', ctypes.c_uint32),
35        ('sin6_addr',     ctypes.c_uint8 * 16),
36        ('sin6_scope_id', ctypes.c_uint32)
37    ]
38
39class sockaddr_storage(ctypes.Union):
40    _fields_ = [
41        ("v4",    sockaddr_in),
42        ("v6",   sockaddr_in6)
43    ]
44
45class sctp_sndrcvinfo(ctypes.Structure):
46    _fields_ = [
47        ('sinfo_stream',        ctypes.c_uint16),
48        ('sinfo_ssn',           ctypes.c_uint16),
49        ('sinfo_flags',         ctypes.c_uint16),
50        ('sinfo_ppid',          ctypes.c_uint32),
51        ('sinfo_context',       ctypes.c_uint32),
52        ('sinfo_timetolive',    ctypes.c_uint32),
53        ('sinfo_tsn',           ctypes.c_uint32),
54        ('sinfo_cumtsn',        ctypes.c_uint32),
55        ('sinfo_assoc_id',      ctypes.c_uint32),
56    ]
57
58class sctp_setprim(ctypes.Structure):
59    _fields_ = [
60        ('ssp_addr',        sockaddr_storage),
61        ('ssp_pad',         ctypes.c_int8 * (128 - 16)),
62        ('ssp_assoc_id',    ctypes.c_uint32),
63        ('ssp_padding',     ctypes.c_uint32)
64    ]
65
66def to_sockaddr(ip, port):
67    ip = ipaddress.ip_address(ip)
68
69    if ip.version == 4:
70        addr = sockaddr_in()
71        addr.sin_len = ctypes.sizeof(addr)
72        addr.sin_family = socket.AF_INET
73        addr.sin_port = socket.htons(port)
74        addr.sin_addr = socket.htonl(int.from_bytes(ip.packed, byteorder='big'))
75    else:
76        assert ip.version == 6
77
78        addr = sockaddr_in6()
79        addr.sin6_len = ctypes.sizeof(addr)
80        addr.sin6_family = socket.AF_INET6
81        addr.sin6_port = socket.htons(port)
82        for i in range(0, 16):
83            addr.sin6_addr[i] = ip.packed[i]
84
85    return addr
86
87class SCTPServer:
88    def __init__(self, family, port=1234):
89        self._libc = ctypes.CDLL("libc.so.7", use_errno=True)
90
91        self._listen_fd = self._libc.socket(family, socket.SOCK_STREAM, socket.IPPROTO_SCTP)
92        if self._listen_fd == -1:
93            raise Exception("Failed to create socket")
94
95        if family == socket.AF_INET:
96            srvaddr = sockaddr_in()
97            srvaddr.sin_len = ctypes.sizeof(srvaddr)
98            srvaddr.sin_family = socket.AF_INET
99            srvaddr.sin_port = socket.htons(port)
100            srvaddr.sin_addr = socket.INADDR_ANY
101        else:
102            srvaddr = sockaddr_in6()
103            srvaddr.sin6_len = ctypes.sizeof(srvaddr)
104            srvaddr.sin6_family = family
105            srvaddr.sin6_port = socket.htons(port)
106            # Leave sin_addr empty, because ANY is zero
107
108        ret = self._libc.bind(self._listen_fd, ctypes.pointer(srvaddr),
109            ctypes.sizeof(srvaddr))
110        if ret == -1:
111            raise Exception("Failed to bind: %d" % ctypes.get_errno())
112
113        ret = self._libc.listen(self._listen_fd, 2)
114        if ret == -1:
115            raise Exception("Failed to listen")
116
117    def _to_string(self, buf):
118        return ''.join([chr(int.from_bytes(i, byteorder='big')) for i in buf]).rstrip('\x00')
119
120    def accept(self, vnet):
121        fd = self._libc.accept(self._listen_fd, 0, 0)
122        if fd < 0:
123            raise Exception("Failed to accept")
124
125        print("SCTPServer: connection opened")
126        while True:
127            rcvinfo = sctp_sndrcvinfo()
128            flags = ctypes.c_int()
129            buf = ctypes.create_string_buffer(128)
130
131            # Receive a single message, and inform the other vnet about it.
132            ret = self._libc.sctp_recvmsg(fd, ctypes.cast(buf, ctypes.c_void_p), 128,
133                0, 0, ctypes.pointer(rcvinfo), ctypes.pointer(flags))
134            if ret < 0:
135                print("SCTPServer: connection closed")
136                return
137            if ret == 0:
138                continue
139
140            rcvd = {}
141            rcvd['ppid'] = socket.ntohl(rcvinfo.sinfo_ppid)
142            rcvd['data'] = self._to_string(buf)
143            rcvd['len'] = ret
144            print(rcvd)
145            vnet.pipe.send(rcvd)
146
147class SCTPClient:
148    def __init__(self, ip, port=1234, fromaddr=None):
149        self._libc = ctypes.CDLL("libc.so.7", use_errno=True)
150
151        if ipaddress.ip_address(ip).version == 4:
152            family = socket.AF_INET
153        else:
154            family = socket.AF_INET6
155
156        self._fd = self._libc.socket(family, socket.SOCK_STREAM,
157            socket.IPPROTO_SCTP)
158        if self._fd == -1:
159            raise Exception("Failed to open socket")
160
161        if fromaddr is not None:
162            addr = to_sockaddr(fromaddr, 0)
163
164            ret = self._libc.bind(self._fd, ctypes.pointer(addr), ctypes.sizeof(addr))
165            if ret != 0:
166                print("bind() => %d", ctypes.get_errno())
167                raise
168
169        addr = to_sockaddr(ip, port)
170        ret = self._libc.connect(self._fd, ctypes.pointer(addr), ctypes.sizeof(addr))
171        if ret == -1:
172            raise Exception("Failed to connect")
173
174        # Enable NODELAY, because otherwise the sending host may wait for SACK
175        # on a data chunk we've removed
176        enable = ctypes.c_int(1)
177        ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP,
178                SCTP_NODELAY, ctypes.pointer(enable), 4)
179
180    def newpeer(self, addr):
181        print("newpeer(%s)" % (addr))
182
183        setp = sctp_setprim()
184        a = to_sockaddr(addr, 0)
185        if type(a) is sockaddr_in:
186            setp.ssp_addr.v4 = a
187        else:
188            assert type(a) is sockaddr_in6
189            setp.ssp_addr.v6 = a
190
191        ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP,
192            SCTP_PRIMARY_ADDR, ctypes.pointer(setp), ctypes.sizeof(setp))
193        if ret != 0:
194            print("errno %d" % ctypes.get_errno())
195            raise Exception(ctypes.get_errno())
196
197    def newprimary(self, addr):
198        print("newprimary(%s)" % (addr))
199
200        # Strictly speaking needs to be struct sctp_setpeerprim, but that's
201        # identical to sctp_setprim
202        setp = sctp_setprim()
203        a = to_sockaddr(addr, 0)
204        if type(a) is sockaddr_in:
205            setp.ssp_addr.v4 = a
206        else:
207            assert type(a) is sockaddr_in6
208            setp.ssp_addr.v6 = a
209
210        ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP,
211            SCTP_SET_PEER_PRIMARY_ADDR, ctypes.pointer(setp), ctypes.sizeof(setp))
212        if ret != 0:
213            print("errno %d" % ctypes.get_errno())
214            raise
215
216    def bindx(self, addr, add):
217        print("bindx(%s, %s)" % (addr, add))
218
219        addr = to_sockaddr(addr, 0)
220
221        if add:
222            flag = SCTP_BINDX_ADD_ADDR
223        else:
224            flag = SCTP_BINDX_REM_ADDR
225        ret = self._libc.sctp_bindx(self._fd, ctypes.pointer(addr), 1, flag)
226        if ret != 0:
227            print("sctp_bindx() errno %d" % ctypes.get_errno())
228            raise
229
230    def send(self, buf, ppid, ordered=False):
231        flags = 0
232
233        if not ordered:
234            flags = SCTP_UNORDERED
235
236        ppid = socket.htonl(ppid)
237        ret = self._libc.sctp_sendmsg(self._fd, ctypes.c_char_p(buf), len(buf),
238            ctypes.c_void_p(0), 0, ppid, flags, 0, 0, 0)
239        if ret < 0:
240            raise Exception("Failed to send message")
241
242    def close(self):
243        self._libc.close(self._fd)
244        self._fd = -1
245
246class TestSCTP(VnetTestTemplate):
247    REQUIRED_MODULES = ["sctp", "pf"]
248    TOPOLOGY = {
249        "vnet1": {"ifaces": ["if1"]},
250        "vnet2": {"ifaces": ["if1"]},
251        "if1": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]},
252    }
253
254    def vnet2_handler(self, vnet):
255        # Give ourself a second IP address, for multihome testing
256        ifname = vnet.iface_alias_map["if1"].name
257        ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.3/24" % ifname)
258
259        # Start an SCTP server process, pipe the ppid + data back to the other vnet?
260        srv = SCTPServer(socket.AF_INET, port=1234)
261        while True:
262            srv.accept(vnet)
263
264    @pytest.mark.require_user("root")
265    def test_multihome(self):
266        srv_vnet = self.vnet_map["vnet2"]
267
268        ToolsHelper.print_output("/sbin/pfctl -e")
269        ToolsHelper.pf_rules([
270            "block proto sctp",
271            "pass inet proto sctp to 192.0.2.0/24",
272            "pass on lo"])
273
274        # Give the server some time to come up
275        time.sleep(3)
276
277        # Sanity check, we can communicate with the primary address.
278        client = SCTPClient("192.0.2.3", 1234)
279        client.send(b"hello", 0)
280        rcvd = self.wait_object(srv_vnet.pipe)
281        print(rcvd)
282        assert rcvd['ppid'] == 0
283        assert rcvd['data'] == "hello"
284
285        try:
286            client.newpeer("192.0.2.2")
287            client.send(b"world", 0)
288            rcvd = self.wait_object(srv_vnet.pipe)
289            print(rcvd)
290            assert rcvd['ppid'] == 0
291            assert rcvd['data'] == "world"
292        finally:
293            # Debug output
294            ToolsHelper.print_output("/sbin/pfctl -ss")
295            ToolsHelper.print_output("/sbin/pfctl -sr -vv")
296
297        # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1
298        states = ToolsHelper.get_output("/sbin/pfctl -ss")
299        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
300        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states)
301
302    @pytest.mark.require_user("root")
303    def test_multihome_asconf(self):
304        srv_vnet = self.vnet_map["vnet2"]
305
306        # Assign a second IP to ourselves
307        ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.10/24"
308            % self.vnet.iface_alias_map["if1"].name)
309        ToolsHelper.print_output("/sbin/pfctl -e")
310        ToolsHelper.pf_rules([
311            "block proto sctp",
312            "pass on lo",
313            "pass inet proto sctp from 192.0.2.0/24"])
314
315        # Give the server some time to come up
316        time.sleep(3)
317
318        # Sanity check, we can communicate with the primary address.
319        client = SCTPClient("192.0.2.3", 1234, "192.0.2.1")
320        client.send(b"hello", 0)
321        rcvd = self.wait_object(srv_vnet.pipe)
322        print(rcvd)
323        assert rcvd['ppid'] == 0
324        assert rcvd['data'] == "hello"
325
326        # Now add our second address to the connection
327        client.bindx("192.0.2.10", True)
328
329        # We can still communicate
330        client.send(b"world", 0)
331        rcvd = self.wait_object(srv_vnet.pipe)
332        print(rcvd)
333        assert rcvd['ppid'] == 0
334        assert rcvd['data'] == "world"
335
336        # Now change to a different peer address
337        try:
338            client.newprimary("192.0.2.10")
339            client.send(b"!", 0)
340            rcvd = self.wait_object(srv_vnet.pipe, 5)
341            print(rcvd)
342            assert rcvd['ppid'] == 0
343            assert rcvd['data'] == "!"
344        finally:
345            # Debug output
346            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
347
348        # Ensure we have the states we'd expect
349        states = ToolsHelper.get_output("/sbin/pfctl -ss")
350        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
351        assert re.search(r"all sctp 192.0.2.10:.*192.0.2.3:1234", states)
352
353        # Now remove 192.0.2.1 as an address
354        client.bindx("192.0.2.1", False)
355
356        # We can still communicate
357        try:
358            client.send(b"More data", 0)
359            rcvd = self.wait_object(srv_vnet.pipe, 5)
360            print(rcvd)
361            assert rcvd['ppid'] == 0
362            assert rcvd['data'] =="More data"
363        finally:
364            # Debug output
365            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
366
367        # Verify that state is closing
368        states = ToolsHelper.get_output("/sbin/pfctl -ss")
369        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234.*SHUTDOWN", states)
370
371
372    @pytest.mark.require_user("root")
373    def test_permutation_if_bound(self):
374        # Test that we generate all permutations of src/dst addresses.
375        # Assign two addresses to each end, and check for the expected states
376        srv_vnet = self.vnet_map["vnet2"]
377
378        ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
379        ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.4/24" % ifname)
380
381        ToolsHelper.print_output("/sbin/pfctl -e")
382        ToolsHelper.pf_rules([
383            "set state-policy if-bound",
384            "block proto sctp",
385            "pass on lo",
386            "pass inet proto sctp to 192.0.2.0/24"])
387
388        # Give the server some time to come up
389        time.sleep(3)
390
391        # Sanity check, we can communicate with the primary address.
392        client = SCTPClient("192.0.2.3", 1234)
393        client.send(b"hello", 0)
394        rcvd = self.wait_object(srv_vnet.pipe)
395        print(rcvd)
396        assert rcvd['ppid'] == 0
397        assert rcvd['data'] == "hello"
398
399        # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4
400        states = ToolsHelper.get_output("/sbin/pfctl -ss")
401        print(states)
402        assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states)
403        assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states)
404        assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.3:1234", states)
405        assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.2:1234", states)
406
407    @pytest.mark.require_user("root")
408    def test_permutation_floating(self):
409        # Test that we generate all permutations of src/dst addresses.
410        # Assign two addresses to each end, and check for the expected states
411        srv_vnet = self.vnet_map["vnet2"]
412
413        ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
414        ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.4/24" % ifname)
415
416        ToolsHelper.print_output("/sbin/pfctl -e")
417        ToolsHelper.pf_rules([
418            "block proto sctp",
419            "pass on lo",
420            "pass inet proto sctp to 192.0.2.0/24"])
421
422        # Give the server some time to come up
423        time.sleep(3)
424
425        # Sanity check, we can communicate with the primary address.
426        client = SCTPClient("192.0.2.3", 1234)
427        client.send(b"hello", 0)
428        rcvd = self.wait_object(srv_vnet.pipe)
429        print(rcvd)
430        assert rcvd['ppid'] == 0
431        assert rcvd['data'] == "hello"
432
433        # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4
434        states = ToolsHelper.get_output("/sbin/pfctl -ss")
435        print(states)
436        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
437        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states)
438        assert re.search(r"all sctp 192.0.2.4:.*192.0.2.3:1234", states)
439        assert re.search(r"all sctp 192.0.2.4:.*192.0.2.2:1234", states)
440
441    @pytest.mark.require_user("root")
442    def test_limit_addresses(self):
443        srv_vnet = self.vnet_map["vnet2"]
444
445        ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
446        for i in range(0, 16):
447            ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.%d/24" % (ifname, 4 + i))
448
449        ToolsHelper.print_output("/sbin/pfctl -e")
450        ToolsHelper.pf_rules([
451            "block proto sctp",
452            "pass on lo",
453            "pass inet proto sctp to 192.0.2.0/24"])
454
455        # Give the server some time to come up
456        time.sleep(3)
457
458        # Set up a connection, which will try to create states for all addresses
459        # we have assigned
460        client = SCTPClient("192.0.2.3", 1234)
461        client.send(b"hello", 0)
462        rcvd = self.wait_object(srv_vnet.pipe)
463        print(rcvd)
464        assert rcvd['ppid'] == 0
465        assert rcvd['data'] == "hello"
466
467        # But the number should be limited to 9 (original + 8 extra)
468        states = ToolsHelper.get_output("/sbin/pfctl -ss | grep 192.0.2.2")
469        print(states)
470        assert(states.count('\n') <= 9)
471
472    @pytest.mark.require_user("root")
473    def test_disallow_related(self):
474        srv_vnet = self.vnet_map["vnet2"]
475
476        ToolsHelper.print_output("/sbin/pfctl -e")
477        ToolsHelper.pf_rules([
478            "block proto sctp",
479            "pass inet proto sctp to 192.0.2.3",
480            "pass on lo"])
481
482        # Give the server some time to come up
483        time.sleep(3)
484
485        # Sanity check, we can communicate with the primary address.
486        client = SCTPClient("192.0.2.3", 1234)
487        client.send(b"hello", 0)
488        rcvd = self.wait_object(srv_vnet.pipe)
489        print(rcvd)
490        assert rcvd['ppid'] == 0
491        assert rcvd['data'] == "hello"
492
493        # This shouldn't work
494        success=False
495        try:
496            client.newpeer("192.0.2.2")
497            client.send(b"world", 0)
498            rcvd = self.wait_object(srv_vnet.pipe)
499            print(rcvd)
500            assert rcvd['ppid'] == 0
501            assert rcvd['data'] == "world"
502            success=True
503        except:
504            success=False
505        assert not success
506
507        # Check that we have a state for 192.0.2.3, but not 192.0.2.2 to 192.0.2.1
508        states = ToolsHelper.get_output("/sbin/pfctl -ss")
509        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
510        assert not re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states)
511
512    @pytest.mark.require_user("root")
513    def test_allow_related(self):
514        srv_vnet = self.vnet_map["vnet2"]
515
516        ToolsHelper.print_output("/sbin/pfctl -e")
517        ToolsHelper.pf_rules([
518            "set state-policy if-bound",
519            "block proto sctp",
520            "pass inet proto sctp to 192.0.2.3 keep state (allow-related)",
521            "pass on lo"])
522
523        # Give the server some time to come up
524        time.sleep(3)
525
526        # Sanity check, we can communicate with the primary address.
527        client = SCTPClient("192.0.2.3", 1234)
528        client.send(b"hello", 0)
529        rcvd = self.wait_object(srv_vnet.pipe)
530        print(rcvd)
531        assert rcvd['ppid'] == 0
532        assert rcvd['data'] == "hello"
533
534        success=False
535        try:
536            client.newpeer("192.0.2.2")
537            client.send(b"world", 0)
538            rcvd = self.wait_object(srv_vnet.pipe)
539            print(rcvd)
540            assert rcvd['ppid'] == 0
541            assert rcvd['data'] == "world"
542            success=True
543        finally:
544            # Debug output
545            ToolsHelper.print_output("/sbin/pfctl -ss")
546            ToolsHelper.print_output("/sbin/pfctl -sr -vv")
547        assert success
548
549        # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1
550        states = ToolsHelper.get_output("/sbin/pfctl -ss")
551        assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states)
552        assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states)
553
554class TestSCTP_SRV(VnetTestTemplate):
555    REQUIRED_MODULES = ["sctp", "pf"]
556    TOPOLOGY = {
557        "vnet1": {"ifaces": ["if1"]},
558        "vnet2": {"ifaces": ["if1"]},
559        "if1": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]},
560    }
561
562    def vnet2_handler(self, vnet):
563        ToolsHelper.print_output("/sbin/pfctl -e")
564        ToolsHelper.pf_rules([
565            "set state-policy if-bound",
566            "pass inet proto sctp",
567            "pass on lo"])
568
569        # Start an SCTP server process, pipe the ppid + data back to the other vnet?
570        srv = SCTPServer(socket.AF_INET, port=1234)
571        while True:
572            srv.accept(vnet)
573
574    @pytest.mark.require_user("root")
575    @pytest.mark.require_progs(["scapy"])
576    def test_initiate_tag_check(self):
577        # Ensure we don't send ABORTs in response to the other end's INIT_ACK
578        # That'd interfere with our test.
579        ToolsHelper.print_output("/sbin/sysctl net.inet.sctp.blackhole=2")
580
581        import scapy.all as sp
582
583        packet = sp.IP(src="192.0.2.1", dst="192.0.2.2") \
584            / sp.SCTP(sport=1234, dport=1234) \
585            / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500)
586        packet.show()
587
588        r = sp.sr1(packet, timeout=3)
589        assert r
590        r.show()
591        assert r.getlayer(sp.SCTP)
592        assert r.getlayer(sp.SCTPChunkInitAck)
593        assert r.getlayer(sp.SCTP).tag == 1
594
595        # Send another INIT with the same initiate tag, expect another init ack
596        packet = sp.IP(src="192.0.2.1", dst="192.0.2.2") \
597            / sp.SCTP(sport=1234, dport=1234) \
598            / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500)
599        packet.show()
600
601        r = sp.sr1(packet, timeout=3)
602        assert r
603        r.show()
604        assert r.getlayer(sp.SCTP)
605        assert r.getlayer(sp.SCTPChunkInitAck)
606        assert r.getlayer(sp.SCTP).tag == 1
607
608        # Send an INIT with a different initiate tag, expect another init ack
609        packet = sp.IP(src="192.0.2.1", dst="192.0.2.2") \
610            / sp.SCTP(sport=1234, dport=1234) \
611            / sp.SCTPChunkInit(init_tag=42, n_in_streams=1, n_out_streams=1, a_rwnd=1500)
612        packet.show()
613
614        r = sp.sr1(packet, timeout=3)
615        assert r
616        r.show()
617        assert r.getlayer(sp.SCTP)
618        assert r.getlayer(sp.SCTPChunkInitAck)
619        assert r.getlayer(sp.SCTP).tag == 42
620
621    @pytest.mark.require_user("root")
622    @pytest.mark.require_progs(["scapy"])
623    def test_too_many_add_ip(self):
624        import scapy.all as sp
625        DEPTH=90
626        params=[]
627        for i in range(0, DEPTH):
628            ch = sp.SCTPChunkParamAddIPAddr(len=(DEPTH - i) * 8)
629            params.append(ch)
630        packet = sp.IP(src="192.0.2.1", dst="192.0.2.2") \
631            / sp.SCTP(sport=4321, dport=1234) \
632            / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500,
633                params=params)
634        packet.show()
635        sp.hexdump(packet)
636        print("len %d" % len(packet))
637
638        r = sp.sr1(packet, timeout=3)
639        # We should not get a reply to this
640        if r:
641            r.show()
642        assert not r
643
644class TestSCTPv6(VnetTestTemplate):
645    REQUIRED_MODULES = ["sctp", "pf"]
646    TOPOLOGY = {
647        "vnet1": {"ifaces": ["if1"]},
648        "vnet2": {"ifaces": ["if1"]},
649        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
650    }
651
652    def vnet2_handler(self, vnet):
653        # Give ourself a second IP address, for multihome testing
654        ifname = vnet.iface_alias_map["if1"].name
655        ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::3/64" % ifname)
656
657        # Start an SCTP server process, pipe the ppid + data back to the other vnet?
658        srv = SCTPServer(socket.AF_INET6, port=1234)
659        while True:
660            srv.accept(vnet)
661
662    @pytest.mark.require_user("root")
663    def test_multihome(self):
664        srv_vnet = self.vnet_map["vnet2"]
665
666        ToolsHelper.print_output("/sbin/pfctl -e")
667        ToolsHelper.pf_rules([
668            "block proto sctp",
669            "pass on lo",
670            "pass inet6 proto sctp to 2001:db8::0/64"])
671
672        # Give the server some time to come up
673        time.sleep(3)
674
675        # Sanity check, we can communicate with the primary address.
676        client = SCTPClient("2001:db8::3", 1234)
677        client.send(b"hello", 0)
678        rcvd = self.wait_object(srv_vnet.pipe)
679        print(rcvd)
680        assert rcvd['ppid'] == 0
681        assert rcvd['data'] == "hello"
682
683        # Now change to a different peer address
684        try:
685            client.newpeer("2001:db8::2")
686            client.send(b"world", 0)
687            rcvd = self.wait_object(srv_vnet.pipe)
688            print(rcvd)
689            assert rcvd['ppid'] == 0
690            assert rcvd['data'] == "world"
691        finally:
692            # Debug output
693            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
694
695        # Check that we have the expected states
696        states = ToolsHelper.get_output("/sbin/pfctl -ss")
697        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states)
698        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states)
699
700    @pytest.mark.require_user("root")
701    def test_multihome_asconf(self):
702        srv_vnet = self.vnet_map["vnet2"]
703
704        # Assign a second IP to ourselves
705        ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::10/64"
706            % self.vnet.iface_alias_map["if1"].name)
707        ToolsHelper.print_output("/sbin/pfctl -e")
708        ToolsHelper.pf_rules([
709            "block proto sctp",
710            "pass on lo",
711            "pass inet6 proto sctp from 2001:db8::/64"])
712
713        # Give the server some time to come up
714        time.sleep(3)
715
716        # Sanity check, we can communicate with the primary address.
717        client = SCTPClient("2001:db8::3", 1234, "2001:db8::1")
718        client.send(b"hello", 0)
719        rcvd = self.wait_object(srv_vnet.pipe)
720        print(rcvd)
721        assert rcvd['ppid'] == 0
722        assert rcvd['data'] == "hello"
723
724        # Now add our second address to the connection
725        client.bindx("2001:db8::10", True)
726
727        # We can still communicate
728        client.send(b"world", 0)
729        rcvd = self.wait_object(srv_vnet.pipe)
730        print(rcvd)
731        assert rcvd['ppid'] == 0
732        assert rcvd['data'] == "world"
733
734        # Now change to a different peer address
735        try:
736            client.newprimary("2001:db8::10")
737            client.send(b"!", 0)
738            rcvd = self.wait_object(srv_vnet.pipe, 5)
739            print(rcvd)
740            assert rcvd['ppid'] == 0
741            assert rcvd['data'] == "!"
742        finally:
743            # Debug output
744            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
745
746        # Check that we have the expected states
747        states = ToolsHelper.get_output("/sbin/pfctl -ss")
748        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states)
749        assert re.search(r"all sctp 2001:db8::10\[.*2001:db8::3\[1234\]", states)
750
751        # Now remove 2001:db8::1 as an address
752        client.bindx("2001:db8::1", False)
753
754        # Wecan still communicate
755        try:
756            client.send(b"More data", 0)
757            rcvd = self.wait_object(srv_vnet.pipe, 5)
758            print(rcvd)
759            assert rcvd['ppid'] == 0
760            assert rcvd['data'] == "More data"
761        finally:
762            # Debug output
763            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
764
765        # Verify that the state is closing
766        states = ToolsHelper.get_output("/sbin/pfctl -ss")
767        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\].*SHUTDOWN", states)
768
769    @pytest.mark.require_user("root")
770    def test_permutation(self):
771        # Test that we generate all permutations of src/dst addresses.
772        # Assign two addresses to each end, and check for the expected states
773        srv_vnet = self.vnet_map["vnet2"]
774
775        ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
776        ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::4/64" % ifname)
777
778        ToolsHelper.print_output("/sbin/pfctl -e")
779        ToolsHelper.pf_rules([
780            "set state-policy if-bound",
781            "block proto sctp",
782            "pass on lo",
783            "pass inet6 proto sctp to 2001:db8::0/64"])
784
785        # Give the server some time to come up
786        time.sleep(3)
787
788        # Sanity check, we can communicate with the primary address.
789        client = SCTPClient("2001:db8::3", 1234)
790        client.send(b"hello", 0)
791        rcvd = self.wait_object(srv_vnet.pipe)
792        print(rcvd)
793        assert rcvd['ppid'] == 0
794        assert rcvd['data'] == "hello"
795
796        # Check that we have a state for 2001:db8::3 and 2001:db8::2 to 2001:db8::1, but also to 2001:db8::4
797        states = ToolsHelper.get_output("/sbin/pfctl -ss")
798        print(states)
799        assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states)
800        assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states)
801        assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::2\[1234\]", states)
802        assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::3\[1234\]", states)
803
804    @pytest.mark.require_user("root")
805    def test_permutation_floating(self):
806        # Test that we generate all permutations of src/dst addresses.
807        # Assign two addresses to each end, and check for the expected states
808        srv_vnet = self.vnet_map["vnet2"]
809
810        ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
811        ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::4/64" % ifname)
812
813        ToolsHelper.print_output("/sbin/pfctl -e")
814        ToolsHelper.pf_rules([
815            "block proto sctp",
816            "pass on lo",
817            "pass inet6 proto sctp to 2001:db8::0/64"])
818
819        # Give the server some time to come up
820        time.sleep(3)
821
822        # Sanity check, we can communicate with the primary address.
823        client = SCTPClient("2001:db8::3", 1234)
824        client.send(b"hello", 0)
825        rcvd = self.wait_object(srv_vnet.pipe)
826        print(rcvd)
827        assert rcvd['ppid'] == 0
828        assert rcvd['data'] == "hello"
829
830        # Check that we have a state for 2001:db8::3 and 2001:db8::2 to 2001:db8::1, but also to 2001:db8::4
831        states = ToolsHelper.get_output("/sbin/pfctl -ss")
832        print(states)
833        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states)
834        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states)
835        assert re.search(r"all sctp 2001:db8::4\[.*2001:db8::2\[1234\]", states)
836        assert re.search(r"all sctp 2001:db8::4\[.*2001:db8::3\[1234\]", states)
837