1import pytest 2import ctypes 3import socket 4import ipaddress 5import re 6from atf_python.sys.net.tools import ToolsHelper 7from atf_python.sys.net.vnet import VnetTestTemplate 8 9import time 10 11SCTP_UNORDERED = 0x0400 12 13SCTP_NODELAY = 0x00000004 14SCTP_SET_PEER_PRIMARY_ADDR = 0x00000006 15SCTP_PRIMARY_ADDR = 0x00000007 16 17SCTP_BINDX_ADD_ADDR = 0x00008001 18SCTP_BINDX_REM_ADDR = 0x00008002 19 20class sockaddr_in(ctypes.Structure): 21 _fields_ = [ 22 ('sin_len', ctypes.c_uint8), 23 ('sin_family', ctypes.c_uint8), 24 ('sin_port', ctypes.c_uint16), 25 ('sin_addr', ctypes.c_uint32), 26 ('sin_zero', ctypes.c_int8 * 8) 27 ] 28 29class sockaddr_in6(ctypes.Structure): 30 _fields_ = [ 31 ('sin6_len', ctypes.c_uint8), 32 ('sin6_family', ctypes.c_uint8), 33 ('sin6_port', ctypes.c_uint16), 34 ('sin6_flowinfo', ctypes.c_uint32), 35 ('sin6_addr', ctypes.c_uint8 * 16), 36 ('sin6_scope_id', ctypes.c_uint32) 37 ] 38 39class sockaddr_storage(ctypes.Union): 40 _fields_ = [ 41 ("v4", sockaddr_in), 42 ("v6", sockaddr_in6) 43 ] 44 45class sctp_sndrcvinfo(ctypes.Structure): 46 _fields_ = [ 47 ('sinfo_stream', ctypes.c_uint16), 48 ('sinfo_ssn', ctypes.c_uint16), 49 ('sinfo_flags', ctypes.c_uint16), 50 ('sinfo_ppid', ctypes.c_uint32), 51 ('sinfo_context', ctypes.c_uint32), 52 ('sinfo_timetolive', ctypes.c_uint32), 53 ('sinfo_tsn', ctypes.c_uint32), 54 ('sinfo_cumtsn', ctypes.c_uint32), 55 ('sinfo_assoc_id', ctypes.c_uint32), 56 ] 57 58class sctp_setprim(ctypes.Structure): 59 _fields_ = [ 60 ('ssp_addr', sockaddr_storage), 61 ('ssp_pad', ctypes.c_int8 * (128 - 16)), 62 ('ssp_assoc_id', ctypes.c_uint32), 63 ('ssp_padding', ctypes.c_uint32) 64 ] 65 66def to_sockaddr(ip, port): 67 ip = ipaddress.ip_address(ip) 68 69 if ip.version == 4: 70 addr = sockaddr_in() 71 addr.sin_len = ctypes.sizeof(addr) 72 addr.sin_family = socket.AF_INET 73 addr.sin_port = socket.htons(port) 74 addr.sin_addr = socket.htonl(int.from_bytes(ip.packed, byteorder='big')) 75 else: 76 assert ip.version == 6 77 78 addr = sockaddr_in6() 79 addr.sin6_len = ctypes.sizeof(addr) 80 addr.sin6_family = socket.AF_INET6 81 addr.sin6_port = socket.htons(port) 82 for i in range(0, 16): 83 addr.sin6_addr[i] = ip.packed[i] 84 85 return addr 86 87class SCTPServer: 88 def __init__(self, family, port=1234): 89 self._libc = ctypes.CDLL("libc.so.7", use_errno=True) 90 91 self._listen_fd = self._libc.socket(family, socket.SOCK_STREAM, socket.IPPROTO_SCTP) 92 if self._listen_fd == -1: 93 raise Exception("Failed to create socket") 94 95 if family == socket.AF_INET: 96 srvaddr = sockaddr_in() 97 srvaddr.sin_len = ctypes.sizeof(srvaddr) 98 srvaddr.sin_family = socket.AF_INET 99 srvaddr.sin_port = socket.htons(port) 100 srvaddr.sin_addr = socket.INADDR_ANY 101 else: 102 srvaddr = sockaddr_in6() 103 srvaddr.sin6_len = ctypes.sizeof(srvaddr) 104 srvaddr.sin6_family = family 105 srvaddr.sin6_port = socket.htons(port) 106 # Leave sin_addr empty, because ANY is zero 107 108 ret = self._libc.bind(self._listen_fd, ctypes.pointer(srvaddr), 109 ctypes.sizeof(srvaddr)) 110 if ret == -1: 111 raise Exception("Failed to bind: %d" % ctypes.get_errno()) 112 113 ret = self._libc.listen(self._listen_fd, 2) 114 if ret == -1: 115 raise Exception("Failed to listen") 116 117 def _to_string(self, buf): 118 return ''.join([chr(int.from_bytes(i, byteorder='big')) for i in buf]).rstrip('\x00') 119 120 def accept(self, vnet): 121 fd = self._libc.accept(self._listen_fd, 0, 0) 122 if fd < 0: 123 raise Exception("Failed to accept") 124 125 print("SCTPServer: connection opened") 126 while True: 127 rcvinfo = sctp_sndrcvinfo() 128 flags = ctypes.c_int() 129 buf = ctypes.create_string_buffer(128) 130 131 # Receive a single message, and inform the other vnet about it. 132 ret = self._libc.sctp_recvmsg(fd, ctypes.cast(buf, ctypes.c_void_p), 128, 133 0, 0, ctypes.pointer(rcvinfo), ctypes.pointer(flags)) 134 if ret < 0: 135 print("SCTPServer: connection closed") 136 return 137 if ret == 0: 138 continue 139 140 rcvd = {} 141 rcvd['ppid'] = socket.ntohl(rcvinfo.sinfo_ppid) 142 rcvd['data'] = self._to_string(buf) 143 rcvd['len'] = ret 144 print(rcvd) 145 vnet.pipe.send(rcvd) 146 147class SCTPClient: 148 def __init__(self, ip, port=1234, fromaddr=None): 149 self._libc = ctypes.CDLL("libc.so.7", use_errno=True) 150 151 if ipaddress.ip_address(ip).version == 4: 152 family = socket.AF_INET 153 else: 154 family = socket.AF_INET6 155 156 self._fd = self._libc.socket(family, socket.SOCK_STREAM, 157 socket.IPPROTO_SCTP) 158 if self._fd == -1: 159 raise Exception("Failed to open socket") 160 161 if fromaddr is not None: 162 addr = to_sockaddr(fromaddr, 0) 163 164 ret = self._libc.bind(self._fd, ctypes.pointer(addr), ctypes.sizeof(addr)) 165 if ret != 0: 166 print("bind() => %d", ctypes.get_errno()) 167 raise 168 169 addr = to_sockaddr(ip, port) 170 ret = self._libc.connect(self._fd, ctypes.pointer(addr), ctypes.sizeof(addr)) 171 if ret == -1: 172 raise Exception("Failed to connect") 173 174 # Enable NODELAY, because otherwise the sending host may wait for SACK 175 # on a data chunk we've removed 176 enable = ctypes.c_int(1) 177 ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP, 178 SCTP_NODELAY, ctypes.pointer(enable), 4) 179 180 def newpeer(self, addr): 181 print("newpeer(%s)" % (addr)) 182 183 setp = sctp_setprim() 184 a = to_sockaddr(addr, 0) 185 if type(a) is sockaddr_in: 186 setp.ssp_addr.v4 = a 187 else: 188 assert type(a) is sockaddr_in6 189 setp.ssp_addr.v6 = a 190 191 ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP, 192 SCTP_PRIMARY_ADDR, ctypes.pointer(setp), ctypes.sizeof(setp)) 193 if ret != 0: 194 print("errno %d" % ctypes.get_errno()) 195 raise Exception(ctypes.get_errno()) 196 197 def newprimary(self, addr): 198 print("newprimary(%s)" % (addr)) 199 200 # Strictly speaking needs to be struct sctp_setpeerprim, but that's 201 # identical to sctp_setprim 202 setp = sctp_setprim() 203 a = to_sockaddr(addr, 0) 204 if type(a) is sockaddr_in: 205 setp.ssp_addr.v4 = a 206 else: 207 assert type(a) is sockaddr_in6 208 setp.ssp_addr.v6 = a 209 210 ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP, 211 SCTP_SET_PEER_PRIMARY_ADDR, ctypes.pointer(setp), ctypes.sizeof(setp)) 212 if ret != 0: 213 print("errno %d" % ctypes.get_errno()) 214 raise 215 216 def bindx(self, addr, add): 217 print("bindx(%s, %s)" % (addr, add)) 218 219 addr = to_sockaddr(addr, 0) 220 221 if add: 222 flag = SCTP_BINDX_ADD_ADDR 223 else: 224 flag = SCTP_BINDX_REM_ADDR 225 ret = self._libc.sctp_bindx(self._fd, ctypes.pointer(addr), 1, flag) 226 if ret != 0: 227 print("sctp_bindx() errno %d" % ctypes.get_errno()) 228 raise 229 230 def send(self, buf, ppid, ordered=False): 231 flags = 0 232 233 if not ordered: 234 flags = SCTP_UNORDERED 235 236 ppid = socket.htonl(ppid) 237 ret = self._libc.sctp_sendmsg(self._fd, ctypes.c_char_p(buf), len(buf), 238 ctypes.c_void_p(0), 0, ppid, flags, 0, 0, 0) 239 if ret < 0: 240 raise Exception("Failed to send message") 241 242 def close(self): 243 self._libc.close(self._fd) 244 self._fd = -1 245 246class TestSCTP(VnetTestTemplate): 247 REQUIRED_MODULES = ["sctp", "pf"] 248 TOPOLOGY = { 249 "vnet1": {"ifaces": ["if1"]}, 250 "vnet2": {"ifaces": ["if1"]}, 251 "if1": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 252 } 253 254 def vnet2_handler(self, vnet): 255 # Give ourself a second IP address, for multihome testing 256 ifname = vnet.iface_alias_map["if1"].name 257 ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.3/24" % ifname) 258 259 # Start an SCTP server process, pipe the ppid + data back to the other vnet? 260 srv = SCTPServer(socket.AF_INET, port=1234) 261 while True: 262 srv.accept(vnet) 263 264 @pytest.mark.require_user("root") 265 def test_multihome(self): 266 srv_vnet = self.vnet_map["vnet2"] 267 268 ToolsHelper.print_output("/sbin/pfctl -e") 269 ToolsHelper.pf_rules([ 270 "block proto sctp", 271 "pass inet proto sctp to 192.0.2.0/24", 272 "pass on lo"]) 273 274 # Sanity check, we can communicate with the primary address. 275 client = SCTPClient("192.0.2.3", 1234) 276 client.send(b"hello", 0) 277 rcvd = self.wait_object(srv_vnet.pipe) 278 print(rcvd) 279 assert rcvd['ppid'] == 0 280 assert rcvd['data'] == "hello" 281 282 try: 283 client.newpeer("192.0.2.2") 284 client.send(b"world", 0) 285 rcvd = self.wait_object(srv_vnet.pipe) 286 print(rcvd) 287 assert rcvd['ppid'] == 0 288 assert rcvd['data'] == "world" 289 finally: 290 # Debug output 291 ToolsHelper.print_output("/sbin/pfctl -ss") 292 ToolsHelper.print_output("/sbin/pfctl -sr -vv") 293 294 # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1 295 states = ToolsHelper.get_output("/sbin/pfctl -ss") 296 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states) 297 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states) 298 299 @pytest.mark.require_user("root") 300 def test_multihome_asconf(self): 301 srv_vnet = self.vnet_map["vnet2"] 302 303 # Assign a second IP to ourselves 304 ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.10/24" 305 % self.vnet.iface_alias_map["if1"].name) 306 ToolsHelper.print_output("/sbin/pfctl -e") 307 ToolsHelper.pf_rules([ 308 "block proto sctp", 309 "pass on lo", 310 "pass inet proto sctp from 192.0.2.0/24"]) 311 312 # Sanity check, we can communicate with the primary address. 313 client = SCTPClient("192.0.2.3", 1234, "192.0.2.1") 314 client.send(b"hello", 0) 315 rcvd = self.wait_object(srv_vnet.pipe) 316 print(rcvd) 317 assert rcvd['ppid'] == 0 318 assert rcvd['data'] == "hello" 319 320 # Now add our second address to the connection 321 client.bindx("192.0.2.10", True) 322 323 # We can still communicate 324 client.send(b"world", 0) 325 rcvd = self.wait_object(srv_vnet.pipe) 326 print(rcvd) 327 assert rcvd['ppid'] == 0 328 assert rcvd['data'] == "world" 329 330 # Now change to a different peer address 331 try: 332 client.newprimary("192.0.2.10") 333 client.send(b"!", 0) 334 rcvd = self.wait_object(srv_vnet.pipe, 5) 335 print(rcvd) 336 assert rcvd['ppid'] == 0 337 assert rcvd['data'] == "!" 338 finally: 339 # Debug output 340 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 341 342 # Ensure we have the states we'd expect 343 states = ToolsHelper.get_output("/sbin/pfctl -ss") 344 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states) 345 assert re.search(r"all sctp 192.0.2.10:.*192.0.2.3:1234", states) 346 347 # Now remove 192.0.2.1 as an address 348 client.bindx("192.0.2.1", False) 349 350 # We can still communicate 351 try: 352 client.send(b"More data", 0) 353 rcvd = self.wait_object(srv_vnet.pipe, 5) 354 print(rcvd) 355 assert rcvd['ppid'] == 0 356 assert rcvd['data'] =="More data" 357 finally: 358 # Debug output 359 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 360 361 # Verify that state is closing 362 states = ToolsHelper.get_output("/sbin/pfctl -ss") 363 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234.*SHUTDOWN", states) 364 365 366 @pytest.mark.require_user("root") 367 def test_permutation_if_bound(self): 368 # Test that we generate all permutations of src/dst addresses. 369 # Assign two addresses to each end, and check for the expected states 370 srv_vnet = self.vnet_map["vnet2"] 371 372 ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name 373 ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.4/24" % ifname) 374 375 ToolsHelper.print_output("/sbin/pfctl -e") 376 ToolsHelper.pf_rules([ 377 "set state-policy if-bound", 378 "block proto sctp", 379 "pass on lo", 380 "pass inet proto sctp to 192.0.2.0/24"]) 381 382 # Sanity check, we can communicate with the primary address. 383 client = SCTPClient("192.0.2.3", 1234) 384 client.send(b"hello", 0) 385 rcvd = self.wait_object(srv_vnet.pipe) 386 print(rcvd) 387 assert rcvd['ppid'] == 0 388 assert rcvd['data'] == "hello" 389 390 # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4 391 states = ToolsHelper.get_output("/sbin/pfctl -ss") 392 print(states) 393 assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states) 394 assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states) 395 assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.3:1234", states) 396 assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.2:1234", states) 397 398 @pytest.mark.require_user("root") 399 def test_permutation_floating(self): 400 # Test that we generate all permutations of src/dst addresses. 401 # Assign two addresses to each end, and check for the expected states 402 srv_vnet = self.vnet_map["vnet2"] 403 404 ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name 405 ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.4/24" % ifname) 406 407 ToolsHelper.print_output("/sbin/pfctl -e") 408 ToolsHelper.pf_rules([ 409 "block proto sctp", 410 "pass on lo", 411 "pass inet proto sctp to 192.0.2.0/24"]) 412 413 # Sanity check, we can communicate with the primary address. 414 client = SCTPClient("192.0.2.3", 1234) 415 client.send(b"hello", 0) 416 rcvd = self.wait_object(srv_vnet.pipe) 417 print(rcvd) 418 assert rcvd['ppid'] == 0 419 assert rcvd['data'] == "hello" 420 421 # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4 422 states = ToolsHelper.get_output("/sbin/pfctl -ss") 423 print(states) 424 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states) 425 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states) 426 assert re.search(r"all sctp 192.0.2.4:.*192.0.2.3:1234", states) 427 assert re.search(r"all sctp 192.0.2.4:.*192.0.2.2:1234", states) 428 429 @pytest.mark.require_user("root") 430 def test_disallow_related(self): 431 srv_vnet = self.vnet_map["vnet2"] 432 433 ToolsHelper.print_output("/sbin/pfctl -e") 434 ToolsHelper.pf_rules([ 435 "block proto sctp", 436 "pass inet proto sctp to 192.0.2.3", 437 "pass on lo"]) 438 439 # Sanity check, we can communicate with the primary address. 440 client = SCTPClient("192.0.2.3", 1234) 441 client.send(b"hello", 0) 442 rcvd = self.wait_object(srv_vnet.pipe) 443 print(rcvd) 444 assert rcvd['ppid'] == 0 445 assert rcvd['data'] == "hello" 446 447 # This shouldn't work 448 success=False 449 try: 450 client.newpeer("192.0.2.2") 451 client.send(b"world", 0) 452 rcvd = self.wait_object(srv_vnet.pipe) 453 print(rcvd) 454 assert rcvd['ppid'] == 0 455 assert rcvd['data'] == "world" 456 success=True 457 except: 458 success=False 459 assert not success 460 461 # Check that we have a state for 192.0.2.3, but not 192.0.2.2 to 192.0.2.1 462 states = ToolsHelper.get_output("/sbin/pfctl -ss") 463 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states) 464 assert not re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states) 465 466 @pytest.mark.require_user("root") 467 def test_allow_related(self): 468 srv_vnet = self.vnet_map["vnet2"] 469 470 ToolsHelper.print_output("/sbin/pfctl -e") 471 ToolsHelper.pf_rules([ 472 "set state-policy if-bound", 473 "block proto sctp", 474 "pass inet proto sctp to 192.0.2.3 keep state (allow-related)", 475 "pass on lo"]) 476 477 # Sanity check, we can communicate with the primary address. 478 client = SCTPClient("192.0.2.3", 1234) 479 client.send(b"hello", 0) 480 rcvd = self.wait_object(srv_vnet.pipe) 481 print(rcvd) 482 assert rcvd['ppid'] == 0 483 assert rcvd['data'] == "hello" 484 485 success=False 486 try: 487 client.newpeer("192.0.2.2") 488 client.send(b"world", 0) 489 rcvd = self.wait_object(srv_vnet.pipe) 490 print(rcvd) 491 assert rcvd['ppid'] == 0 492 assert rcvd['data'] == "world" 493 success=True 494 finally: 495 # Debug output 496 ToolsHelper.print_output("/sbin/pfctl -ss") 497 ToolsHelper.print_output("/sbin/pfctl -sr -vv") 498 assert success 499 500 # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1 501 states = ToolsHelper.get_output("/sbin/pfctl -ss") 502 assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states) 503 assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states) 504 505class TestSCTPv6(VnetTestTemplate): 506 REQUIRED_MODULES = ["sctp", "pf"] 507 TOPOLOGY = { 508 "vnet1": {"ifaces": ["if1"]}, 509 "vnet2": {"ifaces": ["if1"]}, 510 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 511 } 512 513 def vnet2_handler(self, vnet): 514 # Give ourself a second IP address, for multihome testing 515 ifname = vnet.iface_alias_map["if1"].name 516 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::3/64" % ifname) 517 518 # Start an SCTP server process, pipe the ppid + data back to the other vnet? 519 srv = SCTPServer(socket.AF_INET6, port=1234) 520 while True: 521 srv.accept(vnet) 522 523 @pytest.mark.require_user("root") 524 def test_multihome(self): 525 srv_vnet = self.vnet_map["vnet2"] 526 527 ToolsHelper.print_output("/sbin/pfctl -e") 528 ToolsHelper.pf_rules([ 529 "block proto sctp", 530 "pass on lo", 531 "pass inet6 proto sctp to 2001:db8::0/64"]) 532 533 # Sanity check, we can communicate with the primary address. 534 client = SCTPClient("2001:db8::3", 1234) 535 client.send(b"hello", 0) 536 rcvd = self.wait_object(srv_vnet.pipe) 537 print(rcvd) 538 assert rcvd['ppid'] == 0 539 assert rcvd['data'] == "hello" 540 541 # Now change to a different peer address 542 try: 543 client.newpeer("2001:db8::2") 544 client.send(b"world", 0) 545 rcvd = self.wait_object(srv_vnet.pipe) 546 print(rcvd) 547 assert rcvd['ppid'] == 0 548 assert rcvd['data'] == "world" 549 finally: 550 # Debug output 551 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 552 553 # Check that we have the expected states 554 states = ToolsHelper.get_output("/sbin/pfctl -ss") 555 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states) 556 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states) 557 558 @pytest.mark.require_user("root") 559 def test_multihome_asconf(self): 560 srv_vnet = self.vnet_map["vnet2"] 561 562 # Assign a second IP to ourselves 563 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::10/64" 564 % self.vnet.iface_alias_map["if1"].name) 565 ToolsHelper.print_output("/sbin/pfctl -e") 566 ToolsHelper.pf_rules([ 567 "block proto sctp", 568 "pass on lo", 569 "pass inet6 proto sctp from 2001:db8::/64"]) 570 571 # Sanity check, we can communicate with the primary address. 572 client = SCTPClient("2001:db8::3", 1234, "2001:db8::1") 573 client.send(b"hello", 0) 574 rcvd = self.wait_object(srv_vnet.pipe) 575 print(rcvd) 576 assert rcvd['ppid'] == 0 577 assert rcvd['data'] == "hello" 578 579 # Now add our second address to the connection 580 client.bindx("2001:db8::10", True) 581 582 # We can still communicate 583 client.send(b"world", 0) 584 rcvd = self.wait_object(srv_vnet.pipe) 585 print(rcvd) 586 assert rcvd['ppid'] == 0 587 assert rcvd['data'] == "world" 588 589 # Now change to a different peer address 590 try: 591 client.newprimary("2001:db8::10") 592 client.send(b"!", 0) 593 rcvd = self.wait_object(srv_vnet.pipe, 5) 594 print(rcvd) 595 assert rcvd['ppid'] == 0 596 assert rcvd['data'] == "!" 597 finally: 598 # Debug output 599 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 600 601 # Check that we have the expected states 602 states = ToolsHelper.get_output("/sbin/pfctl -ss") 603 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states) 604 assert re.search(r"all sctp 2001:db8::10\[.*2001:db8::3\[1234\]", states) 605 606 # Now remove 2001:db8::1 as an address 607 client.bindx("2001:db8::1", False) 608 609 # Wecan still communicate 610 try: 611 client.send(b"More data", 0) 612 rcvd = self.wait_object(srv_vnet.pipe, 5) 613 print(rcvd) 614 assert rcvd['ppid'] == 0 615 assert rcvd['data'] == "More data" 616 finally: 617 # Debug output 618 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 619 620 # Verify that the state is closing 621 states = ToolsHelper.get_output("/sbin/pfctl -ss") 622 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\].*SHUTDOWN", states) 623 624 @pytest.mark.require_user("root") 625 def test_permutation(self): 626 # Test that we generate all permutations of src/dst addresses. 627 # Assign two addresses to each end, and check for the expected states 628 srv_vnet = self.vnet_map["vnet2"] 629 630 ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name 631 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::4/64" % ifname) 632 633 ToolsHelper.print_output("/sbin/pfctl -e") 634 ToolsHelper.pf_rules([ 635 "set state-policy if-bound", 636 "block proto sctp", 637 "pass on lo", 638 "pass inet6 proto sctp to 2001:db8::0/64"]) 639 640 # Sanity check, we can communicate with the primary address. 641 client = SCTPClient("2001:db8::3", 1234) 642 client.send(b"hello", 0) 643 rcvd = self.wait_object(srv_vnet.pipe) 644 print(rcvd) 645 assert rcvd['ppid'] == 0 646 assert rcvd['data'] == "hello" 647 648 # Check that we have a state for 2001:db8::3 and 2001:db8::2 to 2001:db8::1, but also to 2001:db8::4 649 states = ToolsHelper.get_output("/sbin/pfctl -ss") 650 print(states) 651 assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states) 652 assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states) 653 assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::2\[1234\]", states) 654 assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::3\[1234\]", states) 655 656 @pytest.mark.require_user("root") 657 def test_permutation_floating(self): 658 # Test that we generate all permutations of src/dst addresses. 659 # Assign two addresses to each end, and check for the expected states 660 srv_vnet = self.vnet_map["vnet2"] 661 662 ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name 663 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::4/64" % ifname) 664 665 ToolsHelper.print_output("/sbin/pfctl -e") 666 ToolsHelper.pf_rules([ 667 "block proto sctp", 668 "pass on lo", 669 "pass inet6 proto sctp to 2001:db8::0/64"]) 670 671 # Sanity check, we can communicate with the primary address. 672 client = SCTPClient("2001:db8::3", 1234) 673 client.send(b"hello", 0) 674 rcvd = self.wait_object(srv_vnet.pipe) 675 print(rcvd) 676 assert rcvd['ppid'] == 0 677 assert rcvd['data'] == "hello" 678 679 # Check that we have a state for 2001:db8::3 and 2001:db8::2 to 2001:db8::1, but also to 2001:db8::4 680 states = ToolsHelper.get_output("/sbin/pfctl -ss") 681 print(states) 682 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states) 683 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states) 684 assert re.search(r"all sctp 2001:db8::4\[.*2001:db8::2\[1234\]", states) 685 assert re.search(r"all sctp 2001:db8::4\[.*2001:db8::3\[1234\]", states) 686