xref: /freebsd/tests/sys/netpfil/pf/sctp.py (revision e4f2733df8c9d2fd0c5e8fdc8bec002bf39811f3)
1import pytest
2import ctypes
3import socket
4import ipaddress
5import re
6from atf_python.sys.net.tools import ToolsHelper
7from atf_python.sys.net.vnet import VnetTestTemplate
8
9import time
10
11SCTP_UNORDERED = 0x0400
12
13SCTP_NODELAY                 = 0x00000004
14SCTP_SET_PEER_PRIMARY_ADDR   = 0x00000006
15SCTP_PRIMARY_ADDR            = 0x00000007
16
17SCTP_BINDX_ADD_ADDR          = 0x00008001
18SCTP_BINDX_REM_ADDR          = 0x00008002
19
20class sockaddr_in(ctypes.Structure):
21    _fields_ = [
22        ('sin_len', ctypes.c_uint8),
23        ('sin_family', ctypes.c_uint8),
24        ('sin_port', ctypes.c_uint16),
25        ('sin_addr', ctypes.c_uint32),
26        ('sin_zero', ctypes.c_int8 * 8)
27    ]
28
29class sockaddr_in6(ctypes.Structure):
30    _fields_ = [
31        ('sin6_len',      ctypes.c_uint8),
32        ('sin6_family',   ctypes.c_uint8),
33        ('sin6_port',     ctypes.c_uint16),
34        ('sin6_flowinfo', ctypes.c_uint32),
35        ('sin6_addr',     ctypes.c_uint8 * 16),
36        ('sin6_scope_id', ctypes.c_uint32)
37    ]
38
39class sockaddr_storage(ctypes.Union):
40    _fields_ = [
41        ("v4",    sockaddr_in),
42        ("v6",   sockaddr_in6)
43    ]
44
45class sctp_sndrcvinfo(ctypes.Structure):
46    _fields_ = [
47        ('sinfo_stream',        ctypes.c_uint16),
48        ('sinfo_ssn',           ctypes.c_uint16),
49        ('sinfo_flags',         ctypes.c_uint16),
50        ('sinfo_ppid',          ctypes.c_uint32),
51        ('sinfo_context',       ctypes.c_uint32),
52        ('sinfo_timetolive',    ctypes.c_uint32),
53        ('sinfo_tsn',           ctypes.c_uint32),
54        ('sinfo_cumtsn',        ctypes.c_uint32),
55        ('sinfo_assoc_id',      ctypes.c_uint32),
56    ]
57
58class sctp_setprim(ctypes.Structure):
59    _fields_ = [
60        ('ssp_addr',        sockaddr_storage),
61        ('ssp_pad',         ctypes.c_int8 * (128 - 16)),
62        ('ssp_assoc_id',    ctypes.c_uint32),
63        ('ssp_padding',     ctypes.c_uint32)
64    ]
65
66def to_sockaddr(ip, port):
67    ip = ipaddress.ip_address(ip)
68
69    if ip.version == 4:
70        addr = sockaddr_in()
71        addr.sin_len = ctypes.sizeof(addr)
72        addr.sin_family = socket.AF_INET
73        addr.sin_port = socket.htons(port)
74        addr.sin_addr = socket.htonl(int.from_bytes(ip.packed, byteorder='big'))
75    else:
76        assert ip.version == 6
77
78        addr = sockaddr_in6()
79        addr.sin6_len = ctypes.sizeof(addr)
80        addr.sin6_family = socket.AF_INET6
81        addr.sin6_port = socket.htons(port)
82        for i in range(0, 16):
83            addr.sin6_addr[i] = ip.packed[i]
84
85    return addr
86
87class SCTPServer:
88    def __init__(self, family, port=1234):
89        self._libc = ctypes.CDLL("libc.so.7", use_errno=True)
90
91        self._listen_fd = self._libc.socket(family, socket.SOCK_STREAM, socket.IPPROTO_SCTP)
92        if self._listen_fd == -1:
93            raise Exception("Failed to create socket")
94
95        if family == socket.AF_INET:
96            srvaddr = sockaddr_in()
97            srvaddr.sin_len = ctypes.sizeof(srvaddr)
98            srvaddr.sin_family = socket.AF_INET
99            srvaddr.sin_port = socket.htons(port)
100            srvaddr.sin_addr = socket.INADDR_ANY
101        else:
102            srvaddr = sockaddr_in6()
103            srvaddr.sin6_len = ctypes.sizeof(srvaddr)
104            srvaddr.sin6_family = family
105            srvaddr.sin6_port = socket.htons(port)
106            # Leave sin_addr empty, because ANY is zero
107
108        ret = self._libc.bind(self._listen_fd, ctypes.pointer(srvaddr),
109            ctypes.sizeof(srvaddr))
110        if ret == -1:
111            raise Exception("Failed to bind: %d" % ctypes.get_errno())
112
113        ret = self._libc.listen(self._listen_fd, 2)
114        if ret == -1:
115            raise Exception("Failed to listen")
116
117    def _to_string(self, buf):
118        return ''.join([chr(int.from_bytes(i, byteorder='big')) for i in buf]).rstrip('\x00')
119
120    def accept(self, vnet):
121        fd = self._libc.accept(self._listen_fd, 0, 0)
122        if fd < 0:
123            raise Exception("Failed to accept")
124
125        print("SCTPServer: connection opened")
126        while True:
127            rcvinfo = sctp_sndrcvinfo()
128            flags = ctypes.c_int()
129            buf = ctypes.create_string_buffer(128)
130
131            # Receive a single message, and inform the other vnet about it.
132            ret = self._libc.sctp_recvmsg(fd, ctypes.cast(buf, ctypes.c_void_p), 128,
133                0, 0, ctypes.pointer(rcvinfo), ctypes.pointer(flags))
134            if ret < 0:
135                print("SCTPServer: connection closed")
136                return
137            if ret == 0:
138                continue
139
140            rcvd = {}
141            rcvd['ppid'] = socket.ntohl(rcvinfo.sinfo_ppid)
142            rcvd['data'] = self._to_string(buf)
143            rcvd['len'] = ret
144            print(rcvd)
145            vnet.pipe.send(rcvd)
146
147class SCTPClient:
148    def __init__(self, ip, port=1234, fromaddr=None):
149        self._libc = ctypes.CDLL("libc.so.7", use_errno=True)
150
151        if ipaddress.ip_address(ip).version == 4:
152            family = socket.AF_INET
153        else:
154            family = socket.AF_INET6
155
156        self._fd = self._libc.socket(family, socket.SOCK_STREAM,
157            socket.IPPROTO_SCTP)
158        if self._fd == -1:
159            raise Exception("Failed to open socket")
160
161        if fromaddr is not None:
162            addr = to_sockaddr(fromaddr, 0)
163
164            ret = self._libc.bind(self._fd, ctypes.pointer(addr), ctypes.sizeof(addr))
165            if ret != 0:
166                print("bind() => %d", ctypes.get_errno())
167                raise
168
169        addr = to_sockaddr(ip, port)
170        ret = self._libc.connect(self._fd, ctypes.pointer(addr), ctypes.sizeof(addr))
171        if ret == -1:
172            raise Exception("Failed to connect")
173
174        # Enable NODELAY, because otherwise the sending host may wait for SACK
175        # on a data chunk we've removed
176        enable = ctypes.c_int(1)
177        ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP,
178                SCTP_NODELAY, ctypes.pointer(enable), 4)
179
180    def newpeer(self, addr):
181        print("newpeer(%s)" % (addr))
182
183        setp = sctp_setprim()
184        a = to_sockaddr(addr, 0)
185        if type(a) is sockaddr_in:
186            setp.ssp_addr.v4 = a
187        else:
188            assert type(a) is sockaddr_in6
189            setp.ssp_addr.v6 = a
190
191        ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP,
192            SCTP_PRIMARY_ADDR, ctypes.pointer(setp), ctypes.sizeof(setp))
193        if ret != 0:
194            print("errno %d" % ctypes.get_errno())
195            raise Exception(ctypes.get_errno())
196
197    def newprimary(self, addr):
198        print("newprimary(%s)" % (addr))
199
200        # Strictly speaking needs to be struct sctp_setpeerprim, but that's
201        # identical to sctp_setprim
202        setp = sctp_setprim()
203        a = to_sockaddr(addr, 0)
204        if type(a) is sockaddr_in:
205            setp.ssp_addr.v4 = a
206        else:
207            assert type(a) is sockaddr_in6
208            setp.ssp_addr.v6 = a
209
210        ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP,
211            SCTP_SET_PEER_PRIMARY_ADDR, ctypes.pointer(setp), ctypes.sizeof(setp))
212        if ret != 0:
213            print("errno %d" % ctypes.get_errno())
214            raise
215
216    def bindx(self, addr, add):
217        print("bindx(%s, %s)" % (addr, add))
218
219        addr = to_sockaddr(addr, 0)
220
221        if add:
222            flag = SCTP_BINDX_ADD_ADDR
223        else:
224            flag = SCTP_BINDX_REM_ADDR
225        ret = self._libc.sctp_bindx(self._fd, ctypes.pointer(addr), 1, flag)
226        if ret != 0:
227            print("sctp_bindx() errno %d" % ctypes.get_errno())
228            raise
229
230    def send(self, buf, ppid, ordered=False):
231        flags = 0
232
233        if not ordered:
234            flags = SCTP_UNORDERED
235
236        ppid = socket.htonl(ppid)
237        ret = self._libc.sctp_sendmsg(self._fd, ctypes.c_char_p(buf), len(buf),
238            ctypes.c_void_p(0), 0, ppid, flags, 0, 0, 0)
239        if ret < 0:
240            raise Exception("Failed to send message")
241
242    def close(self):
243        self._libc.close(self._fd)
244        self._fd = -1
245
246class TestSCTP(VnetTestTemplate):
247    REQUIRED_MODULES = ["sctp", "pf"]
248    TOPOLOGY = {
249        "vnet1": {"ifaces": ["if1"]},
250        "vnet2": {"ifaces": ["if1"]},
251        "if1": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]},
252    }
253
254    def vnet2_handler(self, vnet):
255        # Give ourself a second IP address, for multihome testing
256        ifname = vnet.iface_alias_map["if1"].name
257        ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.3/24" % ifname)
258
259        # Start an SCTP server process, pipe the ppid + data back to the other vnet?
260        srv = SCTPServer(socket.AF_INET, port=1234)
261        while True:
262            srv.accept(vnet)
263
264    @pytest.mark.require_user("root")
265    def test_multihome(self):
266        srv_vnet = self.vnet_map["vnet2"]
267
268        ToolsHelper.print_output("/sbin/pfctl -e")
269        ToolsHelper.pf_rules([
270            "block proto sctp",
271            "pass inet proto sctp to 192.0.2.0/24",
272            "pass on lo"])
273
274        # Sanity check, we can communicate with the primary address.
275        client = SCTPClient("192.0.2.3", 1234)
276        client.send(b"hello", 0)
277        rcvd = self.wait_object(srv_vnet.pipe)
278        print(rcvd)
279        assert rcvd['ppid'] == 0
280        assert rcvd['data'] == "hello"
281
282        try:
283            client.newpeer("192.0.2.2")
284            client.send(b"world", 0)
285            rcvd = self.wait_object(srv_vnet.pipe)
286            print(rcvd)
287            assert rcvd['ppid'] == 0
288            assert rcvd['data'] == "world"
289        finally:
290            # Debug output
291            ToolsHelper.print_output("/sbin/pfctl -ss")
292            ToolsHelper.print_output("/sbin/pfctl -sr -vv")
293
294        # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1
295        states = ToolsHelper.get_output("/sbin/pfctl -ss")
296        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
297        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states)
298
299    @pytest.mark.require_user("root")
300    def test_multihome_asconf(self):
301        srv_vnet = self.vnet_map["vnet2"]
302
303        # Assign a second IP to ourselves
304        ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.10/24"
305            % self.vnet.iface_alias_map["if1"].name)
306        ToolsHelper.print_output("/sbin/pfctl -e")
307        ToolsHelper.pf_rules([
308            "block proto sctp",
309            "pass on lo",
310            "pass inet proto sctp from 192.0.2.0/24"])
311
312        # Sanity check, we can communicate with the primary address.
313        client = SCTPClient("192.0.2.3", 1234, "192.0.2.1")
314        client.send(b"hello", 0)
315        rcvd = self.wait_object(srv_vnet.pipe)
316        print(rcvd)
317        assert rcvd['ppid'] == 0
318        assert rcvd['data'] == "hello"
319
320        # Now add our second address to the connection
321        client.bindx("192.0.2.10", True)
322
323        # We can still communicate
324        client.send(b"world", 0)
325        rcvd = self.wait_object(srv_vnet.pipe)
326        print(rcvd)
327        assert rcvd['ppid'] == 0
328        assert rcvd['data'] == "world"
329
330        # Now change to a different peer address
331        try:
332            client.newprimary("192.0.2.10")
333            client.send(b"!", 0)
334            rcvd = self.wait_object(srv_vnet.pipe, 5)
335            print(rcvd)
336            assert rcvd['ppid'] == 0
337            assert rcvd['data'] == "!"
338        finally:
339            # Debug output
340            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
341
342        # Ensure we have the states we'd expect
343        states = ToolsHelper.get_output("/sbin/pfctl -ss")
344        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
345        assert re.search(r"all sctp 192.0.2.10:.*192.0.2.3:1234", states)
346
347        # Now remove 192.0.2.1 as an address
348        client.bindx("192.0.2.1", False)
349
350        # We can still communicate
351        try:
352            client.send(b"More data", 0)
353            rcvd = self.wait_object(srv_vnet.pipe, 5)
354            print(rcvd)
355            assert rcvd['ppid'] == 0
356            assert rcvd['data'] =="More data"
357        finally:
358            # Debug output
359            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
360
361        # Verify that state is closing
362        states = ToolsHelper.get_output("/sbin/pfctl -ss")
363        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234.*SHUTDOWN", states)
364
365
366    @pytest.mark.require_user("root")
367    def test_permutation_if_bound(self):
368        # Test that we generate all permutations of src/dst addresses.
369        # Assign two addresses to each end, and check for the expected states
370        srv_vnet = self.vnet_map["vnet2"]
371
372        ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
373        ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.4/24" % ifname)
374
375        ToolsHelper.print_output("/sbin/pfctl -e")
376        ToolsHelper.pf_rules([
377            "set state-policy if-bound",
378            "block proto sctp",
379            "pass on lo",
380            "pass inet proto sctp to 192.0.2.0/24"])
381
382        # Sanity check, we can communicate with the primary address.
383        client = SCTPClient("192.0.2.3", 1234)
384        client.send(b"hello", 0)
385        rcvd = self.wait_object(srv_vnet.pipe)
386        print(rcvd)
387        assert rcvd['ppid'] == 0
388        assert rcvd['data'] == "hello"
389
390        # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4
391        states = ToolsHelper.get_output("/sbin/pfctl -ss")
392        print(states)
393        assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states)
394        assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states)
395        assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.3:1234", states)
396        assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.2:1234", states)
397
398    @pytest.mark.require_user("root")
399    def test_permutation_floating(self):
400        # Test that we generate all permutations of src/dst addresses.
401        # Assign two addresses to each end, and check for the expected states
402        srv_vnet = self.vnet_map["vnet2"]
403
404        ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
405        ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.4/24" % ifname)
406
407        ToolsHelper.print_output("/sbin/pfctl -e")
408        ToolsHelper.pf_rules([
409            "block proto sctp",
410            "pass on lo",
411            "pass inet proto sctp to 192.0.2.0/24"])
412
413        # Sanity check, we can communicate with the primary address.
414        client = SCTPClient("192.0.2.3", 1234)
415        client.send(b"hello", 0)
416        rcvd = self.wait_object(srv_vnet.pipe)
417        print(rcvd)
418        assert rcvd['ppid'] == 0
419        assert rcvd['data'] == "hello"
420
421        # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4
422        states = ToolsHelper.get_output("/sbin/pfctl -ss")
423        print(states)
424        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
425        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states)
426        assert re.search(r"all sctp 192.0.2.4:.*192.0.2.3:1234", states)
427        assert re.search(r"all sctp 192.0.2.4:.*192.0.2.2:1234", states)
428
429    @pytest.mark.require_user("root")
430    def test_disallow_related(self):
431        srv_vnet = self.vnet_map["vnet2"]
432
433        ToolsHelper.print_output("/sbin/pfctl -e")
434        ToolsHelper.pf_rules([
435            "block proto sctp",
436            "pass inet proto sctp to 192.0.2.3",
437            "pass on lo"])
438
439        # Sanity check, we can communicate with the primary address.
440        client = SCTPClient("192.0.2.3", 1234)
441        client.send(b"hello", 0)
442        rcvd = self.wait_object(srv_vnet.pipe)
443        print(rcvd)
444        assert rcvd['ppid'] == 0
445        assert rcvd['data'] == "hello"
446
447        # This shouldn't work
448        success=False
449        try:
450            client.newpeer("192.0.2.2")
451            client.send(b"world", 0)
452            rcvd = self.wait_object(srv_vnet.pipe)
453            print(rcvd)
454            assert rcvd['ppid'] == 0
455            assert rcvd['data'] == "world"
456            success=True
457        except:
458            success=False
459        assert not success
460
461        # Check that we have a state for 192.0.2.3, but not 192.0.2.2 to 192.0.2.1
462        states = ToolsHelper.get_output("/sbin/pfctl -ss")
463        assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states)
464        assert not re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states)
465
466    @pytest.mark.require_user("root")
467    def test_allow_related(self):
468        srv_vnet = self.vnet_map["vnet2"]
469
470        ToolsHelper.print_output("/sbin/pfctl -e")
471        ToolsHelper.pf_rules([
472            "set state-policy if-bound",
473            "block proto sctp",
474            "pass inet proto sctp to 192.0.2.3 keep state (allow-related)",
475            "pass on lo"])
476
477        # Sanity check, we can communicate with the primary address.
478        client = SCTPClient("192.0.2.3", 1234)
479        client.send(b"hello", 0)
480        rcvd = self.wait_object(srv_vnet.pipe)
481        print(rcvd)
482        assert rcvd['ppid'] == 0
483        assert rcvd['data'] == "hello"
484
485        success=False
486        try:
487            client.newpeer("192.0.2.2")
488            client.send(b"world", 0)
489            rcvd = self.wait_object(srv_vnet.pipe)
490            print(rcvd)
491            assert rcvd['ppid'] == 0
492            assert rcvd['data'] == "world"
493            success=True
494        finally:
495            # Debug output
496            ToolsHelper.print_output("/sbin/pfctl -ss")
497            ToolsHelper.print_output("/sbin/pfctl -sr -vv")
498        assert success
499
500        # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1
501        states = ToolsHelper.get_output("/sbin/pfctl -ss")
502        assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states)
503        assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states)
504
505class TestSCTPv6(VnetTestTemplate):
506    REQUIRED_MODULES = ["sctp", "pf"]
507    TOPOLOGY = {
508        "vnet1": {"ifaces": ["if1"]},
509        "vnet2": {"ifaces": ["if1"]},
510        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
511    }
512
513    def vnet2_handler(self, vnet):
514        # Give ourself a second IP address, for multihome testing
515        ifname = vnet.iface_alias_map["if1"].name
516        ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::3/64" % ifname)
517
518        # Start an SCTP server process, pipe the ppid + data back to the other vnet?
519        srv = SCTPServer(socket.AF_INET6, port=1234)
520        while True:
521            srv.accept(vnet)
522
523    @pytest.mark.require_user("root")
524    def test_multihome(self):
525        srv_vnet = self.vnet_map["vnet2"]
526
527        ToolsHelper.print_output("/sbin/pfctl -e")
528        ToolsHelper.pf_rules([
529            "block proto sctp",
530            "pass on lo",
531            "pass inet6 proto sctp to 2001:db8::0/64"])
532
533        # Sanity check, we can communicate with the primary address.
534        client = SCTPClient("2001:db8::3", 1234)
535        client.send(b"hello", 0)
536        rcvd = self.wait_object(srv_vnet.pipe)
537        print(rcvd)
538        assert rcvd['ppid'] == 0
539        assert rcvd['data'] == "hello"
540
541        # Now change to a different peer address
542        try:
543            client.newpeer("2001:db8::2")
544            client.send(b"world", 0)
545            rcvd = self.wait_object(srv_vnet.pipe)
546            print(rcvd)
547            assert rcvd['ppid'] == 0
548            assert rcvd['data'] == "world"
549        finally:
550            # Debug output
551            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
552
553        # Check that we have the expected states
554        states = ToolsHelper.get_output("/sbin/pfctl -ss")
555        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states)
556        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states)
557
558    @pytest.mark.require_user("root")
559    def test_multihome_asconf(self):
560        srv_vnet = self.vnet_map["vnet2"]
561
562        # Assign a second IP to ourselves
563        ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::10/64"
564            % self.vnet.iface_alias_map["if1"].name)
565        ToolsHelper.print_output("/sbin/pfctl -e")
566        ToolsHelper.pf_rules([
567            "block proto sctp",
568            "pass on lo",
569            "pass inet6 proto sctp from 2001:db8::/64"])
570
571        # Sanity check, we can communicate with the primary address.
572        client = SCTPClient("2001:db8::3", 1234, "2001:db8::1")
573        client.send(b"hello", 0)
574        rcvd = self.wait_object(srv_vnet.pipe)
575        print(rcvd)
576        assert rcvd['ppid'] == 0
577        assert rcvd['data'] == "hello"
578
579        # Now add our second address to the connection
580        client.bindx("2001:db8::10", True)
581
582        # We can still communicate
583        client.send(b"world", 0)
584        rcvd = self.wait_object(srv_vnet.pipe)
585        print(rcvd)
586        assert rcvd['ppid'] == 0
587        assert rcvd['data'] == "world"
588
589        # Now change to a different peer address
590        try:
591            client.newprimary("2001:db8::10")
592            client.send(b"!", 0)
593            rcvd = self.wait_object(srv_vnet.pipe, 5)
594            print(rcvd)
595            assert rcvd['ppid'] == 0
596            assert rcvd['data'] == "!"
597        finally:
598            # Debug output
599            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
600
601        # Check that we have the expected states
602        states = ToolsHelper.get_output("/sbin/pfctl -ss")
603        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states)
604        assert re.search(r"all sctp 2001:db8::10\[.*2001:db8::3\[1234\]", states)
605
606        # Now remove 2001:db8::1 as an address
607        client.bindx("2001:db8::1", False)
608
609        # Wecan still communicate
610        try:
611            client.send(b"More data", 0)
612            rcvd = self.wait_object(srv_vnet.pipe, 5)
613            print(rcvd)
614            assert rcvd['ppid'] == 0
615            assert rcvd['data'] == "More data"
616        finally:
617            # Debug output
618            ToolsHelper.print_output("/sbin/pfctl -ss -vv")
619
620        # Verify that the state is closing
621        states = ToolsHelper.get_output("/sbin/pfctl -ss")
622        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\].*SHUTDOWN", states)
623
624    @pytest.mark.require_user("root")
625    def test_permutation(self):
626        # Test that we generate all permutations of src/dst addresses.
627        # Assign two addresses to each end, and check for the expected states
628        srv_vnet = self.vnet_map["vnet2"]
629
630        ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
631        ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::4/64" % ifname)
632
633        ToolsHelper.print_output("/sbin/pfctl -e")
634        ToolsHelper.pf_rules([
635            "set state-policy if-bound",
636            "block proto sctp",
637            "pass on lo",
638            "pass inet6 proto sctp to 2001:db8::0/64"])
639
640        # Sanity check, we can communicate with the primary address.
641        client = SCTPClient("2001:db8::3", 1234)
642        client.send(b"hello", 0)
643        rcvd = self.wait_object(srv_vnet.pipe)
644        print(rcvd)
645        assert rcvd['ppid'] == 0
646        assert rcvd['data'] == "hello"
647
648        # Check that we have a state for 2001:db8::3 and 2001:db8::2 to 2001:db8::1, but also to 2001:db8::4
649        states = ToolsHelper.get_output("/sbin/pfctl -ss")
650        print(states)
651        assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states)
652        assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states)
653        assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::2\[1234\]", states)
654        assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::3\[1234\]", states)
655
656    @pytest.mark.require_user("root")
657    def test_permutation_floating(self):
658        # Test that we generate all permutations of src/dst addresses.
659        # Assign two addresses to each end, and check for the expected states
660        srv_vnet = self.vnet_map["vnet2"]
661
662        ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name
663        ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::4/64" % ifname)
664
665        ToolsHelper.print_output("/sbin/pfctl -e")
666        ToolsHelper.pf_rules([
667            "block proto sctp",
668            "pass on lo",
669            "pass inet6 proto sctp to 2001:db8::0/64"])
670
671        # Sanity check, we can communicate with the primary address.
672        client = SCTPClient("2001:db8::3", 1234)
673        client.send(b"hello", 0)
674        rcvd = self.wait_object(srv_vnet.pipe)
675        print(rcvd)
676        assert rcvd['ppid'] == 0
677        assert rcvd['data'] == "hello"
678
679        # Check that we have a state for 2001:db8::3 and 2001:db8::2 to 2001:db8::1, but also to 2001:db8::4
680        states = ToolsHelper.get_output("/sbin/pfctl -ss")
681        print(states)
682        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states)
683        assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states)
684        assert re.search(r"all sctp 2001:db8::4\[.*2001:db8::2\[1234\]", states)
685        assert re.search(r"all sctp 2001:db8::4\[.*2001:db8::3\[1234\]", states)
686