1import pytest 2import ctypes 3import socket 4import ipaddress 5import re 6from atf_python.sys.net.tools import ToolsHelper 7from atf_python.sys.net.vnet import VnetTestTemplate 8 9import time 10 11SCTP_UNORDERED = 0x0400 12 13SCTP_NODELAY = 0x00000004 14SCTP_SET_PEER_PRIMARY_ADDR = 0x00000006 15SCTP_PRIMARY_ADDR = 0x00000007 16 17SCTP_BINDX_ADD_ADDR = 0x00008001 18SCTP_BINDX_REM_ADDR = 0x00008002 19 20class sockaddr_in(ctypes.Structure): 21 _fields_ = [ 22 ('sin_len', ctypes.c_uint8), 23 ('sin_family', ctypes.c_uint8), 24 ('sin_port', ctypes.c_uint16), 25 ('sin_addr', ctypes.c_uint32), 26 ('sin_zero', ctypes.c_int8 * 8) 27 ] 28 29class sockaddr_in6(ctypes.Structure): 30 _fields_ = [ 31 ('sin6_len', ctypes.c_uint8), 32 ('sin6_family', ctypes.c_uint8), 33 ('sin6_port', ctypes.c_uint16), 34 ('sin6_flowinfo', ctypes.c_uint32), 35 ('sin6_addr', ctypes.c_uint8 * 16), 36 ('sin6_scope_id', ctypes.c_uint32) 37 ] 38 39class sockaddr_storage(ctypes.Union): 40 _fields_ = [ 41 ("v4", sockaddr_in), 42 ("v6", sockaddr_in6) 43 ] 44 45class sctp_sndrcvinfo(ctypes.Structure): 46 _fields_ = [ 47 ('sinfo_stream', ctypes.c_uint16), 48 ('sinfo_ssn', ctypes.c_uint16), 49 ('sinfo_flags', ctypes.c_uint16), 50 ('sinfo_ppid', ctypes.c_uint32), 51 ('sinfo_context', ctypes.c_uint32), 52 ('sinfo_timetolive', ctypes.c_uint32), 53 ('sinfo_tsn', ctypes.c_uint32), 54 ('sinfo_cumtsn', ctypes.c_uint32), 55 ('sinfo_assoc_id', ctypes.c_uint32), 56 ] 57 58class sctp_setprim(ctypes.Structure): 59 _fields_ = [ 60 ('ssp_addr', sockaddr_storage), 61 ('ssp_pad', ctypes.c_int8 * (128 - 16)), 62 ('ssp_assoc_id', ctypes.c_uint32), 63 ('ssp_padding', ctypes.c_uint32) 64 ] 65 66def to_sockaddr(ip, port): 67 ip = ipaddress.ip_address(ip) 68 69 if ip.version == 4: 70 addr = sockaddr_in() 71 addr.sin_len = ctypes.sizeof(addr) 72 addr.sin_family = socket.AF_INET 73 addr.sin_port = socket.htons(port) 74 addr.sin_addr = socket.htonl(int.from_bytes(ip.packed, byteorder='big')) 75 else: 76 assert ip.version == 6 77 78 addr = sockaddr_in6() 79 addr.sin6_len = ctypes.sizeof(addr) 80 addr.sin6_family = socket.AF_INET6 81 addr.sin6_port = socket.htons(port) 82 for i in range(0, 16): 83 addr.sin6_addr[i] = ip.packed[i] 84 85 return addr 86 87class SCTPServer: 88 def __init__(self, family, port=1234): 89 self._libc = ctypes.CDLL("libc.so.7", use_errno=True) 90 91 self._listen_fd = self._libc.socket(family, socket.SOCK_STREAM, socket.IPPROTO_SCTP) 92 if self._listen_fd == -1: 93 raise Exception("Failed to create socket") 94 95 if family == socket.AF_INET: 96 srvaddr = sockaddr_in() 97 srvaddr.sin_len = ctypes.sizeof(srvaddr) 98 srvaddr.sin_family = socket.AF_INET 99 srvaddr.sin_port = socket.htons(port) 100 srvaddr.sin_addr = socket.INADDR_ANY 101 else: 102 srvaddr = sockaddr_in6() 103 srvaddr.sin6_len = ctypes.sizeof(srvaddr) 104 srvaddr.sin6_family = family 105 srvaddr.sin6_port = socket.htons(port) 106 # Leave sin_addr empty, because ANY is zero 107 108 ret = self._libc.bind(self._listen_fd, ctypes.pointer(srvaddr), 109 ctypes.sizeof(srvaddr)) 110 if ret == -1: 111 raise Exception("Failed to bind: %d" % ctypes.get_errno()) 112 113 ret = self._libc.listen(self._listen_fd, 2) 114 if ret == -1: 115 raise Exception("Failed to listen") 116 117 def _to_string(self, buf): 118 return ''.join([chr(int.from_bytes(i, byteorder='big')) for i in buf]).rstrip('\x00') 119 120 def accept(self, vnet): 121 fd = self._libc.accept(self._listen_fd, 0, 0) 122 if fd < 0: 123 raise Exception("Failed to accept") 124 125 print("SCTPServer: connection opened") 126 while True: 127 rcvinfo = sctp_sndrcvinfo() 128 flags = ctypes.c_int() 129 buf = ctypes.create_string_buffer(128) 130 131 # Receive a single message, and inform the other vnet about it. 132 ret = self._libc.sctp_recvmsg(fd, ctypes.cast(buf, ctypes.c_void_p), 128, 133 0, 0, ctypes.pointer(rcvinfo), ctypes.pointer(flags)) 134 if ret < 0: 135 print("SCTPServer: connection closed") 136 return 137 if ret == 0: 138 continue 139 140 rcvd = {} 141 rcvd['ppid'] = socket.ntohl(rcvinfo.sinfo_ppid) 142 rcvd['data'] = self._to_string(buf) 143 rcvd['len'] = ret 144 print(rcvd) 145 vnet.pipe.send(rcvd) 146 147class SCTPClient: 148 def __init__(self, ip, port=1234, fromaddr=None): 149 self._libc = ctypes.CDLL("libc.so.7", use_errno=True) 150 151 if ipaddress.ip_address(ip).version == 4: 152 family = socket.AF_INET 153 else: 154 family = socket.AF_INET6 155 156 self._fd = self._libc.socket(family, socket.SOCK_STREAM, 157 socket.IPPROTO_SCTP) 158 if self._fd == -1: 159 raise Exception("Failed to open socket") 160 161 if fromaddr is not None: 162 addr = to_sockaddr(fromaddr, 0) 163 164 ret = self._libc.bind(self._fd, ctypes.pointer(addr), ctypes.sizeof(addr)) 165 if ret != 0: 166 print("bind() => %d", ctypes.get_errno()) 167 raise 168 169 addr = to_sockaddr(ip, port) 170 ret = self._libc.connect(self._fd, ctypes.pointer(addr), ctypes.sizeof(addr)) 171 if ret == -1: 172 raise Exception("Failed to connect") 173 174 # Enable NODELAY, because otherwise the sending host may wait for SACK 175 # on a data chunk we've removed 176 enable = ctypes.c_int(1) 177 ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP, 178 SCTP_NODELAY, ctypes.pointer(enable), 4) 179 180 def newpeer(self, addr): 181 print("newpeer(%s)" % (addr)) 182 183 setp = sctp_setprim() 184 a = to_sockaddr(addr, 0) 185 if type(a) is sockaddr_in: 186 setp.ssp_addr.v4 = a 187 else: 188 assert type(a) is sockaddr_in6 189 setp.ssp_addr.v6 = a 190 191 ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP, 192 SCTP_PRIMARY_ADDR, ctypes.pointer(setp), ctypes.sizeof(setp)) 193 if ret != 0: 194 print("errno %d" % ctypes.get_errno()) 195 raise Exception(ctypes.get_errno()) 196 197 def newprimary(self, addr): 198 print("newprimary(%s)" % (addr)) 199 200 # Strictly speaking needs to be struct sctp_setpeerprim, but that's 201 # identical to sctp_setprim 202 setp = sctp_setprim() 203 a = to_sockaddr(addr, 0) 204 if type(a) is sockaddr_in: 205 setp.ssp_addr.v4 = a 206 else: 207 assert type(a) is sockaddr_in6 208 setp.ssp_addr.v6 = a 209 210 ret = self._libc.setsockopt(self._fd, socket.IPPROTO_SCTP, 211 SCTP_SET_PEER_PRIMARY_ADDR, ctypes.pointer(setp), ctypes.sizeof(setp)) 212 if ret != 0: 213 print("errno %d" % ctypes.get_errno()) 214 raise 215 216 def bindx(self, addr, add): 217 print("bindx(%s, %s)" % (addr, add)) 218 219 addr = to_sockaddr(addr, 0) 220 221 if add: 222 flag = SCTP_BINDX_ADD_ADDR 223 else: 224 flag = SCTP_BINDX_REM_ADDR 225 ret = self._libc.sctp_bindx(self._fd, ctypes.pointer(addr), 1, flag) 226 if ret != 0: 227 print("sctp_bindx() errno %d" % ctypes.get_errno()) 228 raise 229 230 def send(self, buf, ppid, ordered=False): 231 flags = 0 232 233 if not ordered: 234 flags = SCTP_UNORDERED 235 236 ppid = socket.htonl(ppid) 237 ret = self._libc.sctp_sendmsg(self._fd, ctypes.c_char_p(buf), len(buf), 238 ctypes.c_void_p(0), 0, ppid, flags, 0, 0, 0) 239 if ret < 0: 240 raise Exception("Failed to send message") 241 242 def close(self): 243 self._libc.close(self._fd) 244 self._fd = -1 245 246class TestSCTP(VnetTestTemplate): 247 REQUIRED_MODULES = ["sctp", "pf"] 248 TOPOLOGY = { 249 "vnet1": {"ifaces": ["if1"]}, 250 "vnet2": {"ifaces": ["if1"]}, 251 "if1": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 252 } 253 254 def vnet2_handler(self, vnet): 255 # Give ourself a second IP address, for multihome testing 256 ifname = vnet.iface_alias_map["if1"].name 257 ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.3/24" % ifname) 258 259 # Start an SCTP server process, pipe the ppid + data back to the other vnet? 260 srv = SCTPServer(socket.AF_INET, port=1234) 261 while True: 262 srv.accept(vnet) 263 264 @pytest.mark.require_user("root") 265 def test_multihome(self): 266 srv_vnet = self.vnet_map["vnet2"] 267 268 ToolsHelper.print_output("/sbin/pfctl -e") 269 ToolsHelper.pf_rules([ 270 "block proto sctp", 271 "pass inet proto sctp to 192.0.2.0/24", 272 "pass on lo"]) 273 274 # Give the server some time to come up 275 time.sleep(3) 276 277 # Sanity check, we can communicate with the primary address. 278 client = SCTPClient("192.0.2.3", 1234) 279 client.send(b"hello", 0) 280 rcvd = self.wait_object(srv_vnet.pipe) 281 print(rcvd) 282 assert rcvd['ppid'] == 0 283 assert rcvd['data'] == "hello" 284 285 try: 286 client.newpeer("192.0.2.2") 287 client.send(b"world", 0) 288 rcvd = self.wait_object(srv_vnet.pipe) 289 print(rcvd) 290 assert rcvd['ppid'] == 0 291 assert rcvd['data'] == "world" 292 finally: 293 # Debug output 294 ToolsHelper.print_output("/sbin/pfctl -ss") 295 ToolsHelper.print_output("/sbin/pfctl -sr -vv") 296 297 # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1 298 states = ToolsHelper.get_output("/sbin/pfctl -ss") 299 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states) 300 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states) 301 302 @pytest.mark.require_user("root") 303 def test_multihome_asconf(self): 304 srv_vnet = self.vnet_map["vnet2"] 305 306 # Assign a second IP to ourselves 307 ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.10/24" 308 % self.vnet.iface_alias_map["if1"].name) 309 ToolsHelper.print_output("/sbin/pfctl -e") 310 ToolsHelper.pf_rules([ 311 "block proto sctp", 312 "pass on lo", 313 "pass inet proto sctp from 192.0.2.0/24"]) 314 315 # Give the server some time to come up 316 time.sleep(3) 317 318 # Sanity check, we can communicate with the primary address. 319 client = SCTPClient("192.0.2.3", 1234, "192.0.2.1") 320 client.send(b"hello", 0) 321 rcvd = self.wait_object(srv_vnet.pipe) 322 print(rcvd) 323 assert rcvd['ppid'] == 0 324 assert rcvd['data'] == "hello" 325 326 # Now add our second address to the connection 327 client.bindx("192.0.2.10", True) 328 329 # We can still communicate 330 client.send(b"world", 0) 331 rcvd = self.wait_object(srv_vnet.pipe) 332 print(rcvd) 333 assert rcvd['ppid'] == 0 334 assert rcvd['data'] == "world" 335 336 # Now change to a different peer address 337 try: 338 client.newprimary("192.0.2.10") 339 client.send(b"!", 0) 340 rcvd = self.wait_object(srv_vnet.pipe, 5) 341 print(rcvd) 342 assert rcvd['ppid'] == 0 343 assert rcvd['data'] == "!" 344 finally: 345 # Debug output 346 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 347 348 # Ensure we have the states we'd expect 349 states = ToolsHelper.get_output("/sbin/pfctl -ss") 350 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states) 351 assert re.search(r"all sctp 192.0.2.10:.*192.0.2.3:1234", states) 352 353 # Now remove 192.0.2.1 as an address 354 client.bindx("192.0.2.1", False) 355 356 # We can still communicate 357 try: 358 client.send(b"More data", 0) 359 rcvd = self.wait_object(srv_vnet.pipe, 5) 360 print(rcvd) 361 assert rcvd['ppid'] == 0 362 assert rcvd['data'] =="More data" 363 finally: 364 # Debug output 365 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 366 367 # Verify that state is closing 368 states = ToolsHelper.get_output("/sbin/pfctl -ss") 369 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234.*SHUTDOWN", states) 370 371 372 @pytest.mark.require_user("root") 373 def test_permutation_if_bound(self): 374 # Test that we generate all permutations of src/dst addresses. 375 # Assign two addresses to each end, and check for the expected states 376 srv_vnet = self.vnet_map["vnet2"] 377 378 ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name 379 ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.4/24" % ifname) 380 381 ToolsHelper.print_output("/sbin/pfctl -e") 382 ToolsHelper.pf_rules([ 383 "set state-policy if-bound", 384 "block proto sctp", 385 "pass on lo", 386 "pass inet proto sctp to 192.0.2.0/24"]) 387 388 # Give the server some time to come up 389 time.sleep(3) 390 391 # Sanity check, we can communicate with the primary address. 392 client = SCTPClient("192.0.2.3", 1234) 393 client.send(b"hello", 0) 394 rcvd = self.wait_object(srv_vnet.pipe) 395 print(rcvd) 396 assert rcvd['ppid'] == 0 397 assert rcvd['data'] == "hello" 398 399 # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4 400 states = ToolsHelper.get_output("/sbin/pfctl -ss") 401 print(states) 402 assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states) 403 assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states) 404 assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.3:1234", states) 405 assert re.search(r"epair.*sctp 192.0.2.4:.*192.0.2.2:1234", states) 406 407 @pytest.mark.require_user("root") 408 def test_permutation_floating(self): 409 # Test that we generate all permutations of src/dst addresses. 410 # Assign two addresses to each end, and check for the expected states 411 srv_vnet = self.vnet_map["vnet2"] 412 413 ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name 414 ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.4/24" % ifname) 415 416 ToolsHelper.print_output("/sbin/pfctl -e") 417 ToolsHelper.pf_rules([ 418 "block proto sctp", 419 "pass on lo", 420 "pass inet proto sctp to 192.0.2.0/24"]) 421 422 # Give the server some time to come up 423 time.sleep(3) 424 425 # Sanity check, we can communicate with the primary address. 426 client = SCTPClient("192.0.2.3", 1234) 427 client.send(b"hello", 0) 428 rcvd = self.wait_object(srv_vnet.pipe) 429 print(rcvd) 430 assert rcvd['ppid'] == 0 431 assert rcvd['data'] == "hello" 432 433 # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1, but also to 192.0.2.4 434 states = ToolsHelper.get_output("/sbin/pfctl -ss") 435 print(states) 436 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states) 437 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states) 438 assert re.search(r"all sctp 192.0.2.4:.*192.0.2.3:1234", states) 439 assert re.search(r"all sctp 192.0.2.4:.*192.0.2.2:1234", states) 440 441 @pytest.mark.require_user("root") 442 def test_limit_addresses(self): 443 srv_vnet = self.vnet_map["vnet2"] 444 445 ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name 446 for i in range(0, 16): 447 ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.%d/24" % (ifname, 4 + i)) 448 449 ToolsHelper.print_output("/sbin/pfctl -e") 450 ToolsHelper.pf_rules([ 451 "block proto sctp", 452 "pass on lo", 453 "pass inet proto sctp to 192.0.2.0/24"]) 454 455 # Give the server some time to come up 456 time.sleep(3) 457 458 # Set up a connection, which will try to create states for all addresses 459 # we have assigned 460 client = SCTPClient("192.0.2.3", 1234) 461 client.send(b"hello", 0) 462 rcvd = self.wait_object(srv_vnet.pipe) 463 print(rcvd) 464 assert rcvd['ppid'] == 0 465 assert rcvd['data'] == "hello" 466 467 # But the number should be limited to 9 (original + 8 extra) 468 states = ToolsHelper.get_output("/sbin/pfctl -ss | grep 192.0.2.2") 469 print(states) 470 assert(states.count('\n') <= 9) 471 472 @pytest.mark.require_user("root") 473 def test_disallow_related(self): 474 srv_vnet = self.vnet_map["vnet2"] 475 476 ToolsHelper.print_output("/sbin/pfctl -e") 477 ToolsHelper.pf_rules([ 478 "block proto sctp", 479 "pass inet proto sctp to 192.0.2.3", 480 "pass on lo"]) 481 482 # Give the server some time to come up 483 time.sleep(3) 484 485 # Sanity check, we can communicate with the primary address. 486 client = SCTPClient("192.0.2.3", 1234) 487 client.send(b"hello", 0) 488 rcvd = self.wait_object(srv_vnet.pipe) 489 print(rcvd) 490 assert rcvd['ppid'] == 0 491 assert rcvd['data'] == "hello" 492 493 # This shouldn't work 494 success=False 495 try: 496 client.newpeer("192.0.2.2") 497 client.send(b"world", 0) 498 rcvd = self.wait_object(srv_vnet.pipe) 499 print(rcvd) 500 assert rcvd['ppid'] == 0 501 assert rcvd['data'] == "world" 502 success=True 503 except: 504 success=False 505 assert not success 506 507 # Check that we have a state for 192.0.2.3, but not 192.0.2.2 to 192.0.2.1 508 states = ToolsHelper.get_output("/sbin/pfctl -ss") 509 assert re.search(r"all sctp 192.0.2.1:.*192.0.2.3:1234", states) 510 assert not re.search(r"all sctp 192.0.2.1:.*192.0.2.2:1234", states) 511 512 @pytest.mark.require_user("root") 513 def test_allow_related(self): 514 srv_vnet = self.vnet_map["vnet2"] 515 516 ToolsHelper.print_output("/sbin/pfctl -e") 517 ToolsHelper.pf_rules([ 518 "set state-policy if-bound", 519 "block proto sctp", 520 "pass inet proto sctp to 192.0.2.3 keep state (allow-related)", 521 "pass on lo"]) 522 523 # Give the server some time to come up 524 time.sleep(3) 525 526 # Sanity check, we can communicate with the primary address. 527 client = SCTPClient("192.0.2.3", 1234) 528 client.send(b"hello", 0) 529 rcvd = self.wait_object(srv_vnet.pipe) 530 print(rcvd) 531 assert rcvd['ppid'] == 0 532 assert rcvd['data'] == "hello" 533 534 success=False 535 try: 536 client.newpeer("192.0.2.2") 537 client.send(b"world", 0) 538 rcvd = self.wait_object(srv_vnet.pipe) 539 print(rcvd) 540 assert rcvd['ppid'] == 0 541 assert rcvd['data'] == "world" 542 success=True 543 finally: 544 # Debug output 545 ToolsHelper.print_output("/sbin/pfctl -ss") 546 ToolsHelper.print_output("/sbin/pfctl -sr -vv") 547 assert success 548 549 # Check that we have a state for 192.0.2.3 and 192.0.2.2 to 192.0.2.1 550 states = ToolsHelper.get_output("/sbin/pfctl -ss") 551 assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.3:1234", states) 552 assert re.search(r"epair.*sctp 192.0.2.1:.*192.0.2.2:1234", states) 553 554class TestSCTP_SRV(VnetTestTemplate): 555 REQUIRED_MODULES = ["sctp", "pf"] 556 TOPOLOGY = { 557 "vnet1": {"ifaces": ["if1"]}, 558 "vnet2": {"ifaces": ["if1"]}, 559 "if1": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, 560 } 561 562 def vnet2_handler(self, vnet): 563 ToolsHelper.print_output("/sbin/pfctl -e") 564 ToolsHelper.pf_rules([ 565 "set state-policy if-bound", 566 "pass inet proto sctp", 567 "pass on lo"]) 568 569 # Start an SCTP server process, pipe the ppid + data back to the other vnet? 570 srv = SCTPServer(socket.AF_INET, port=1234) 571 while True: 572 srv.accept(vnet) 573 574 @pytest.mark.require_user("root") 575 @pytest.mark.require_progs(["scapy"]) 576 def test_initiate_tag_check(self): 577 # Ensure we don't send ABORTs in response to the other end's INIT_ACK 578 # That'd interfere with our test. 579 ToolsHelper.print_output("/sbin/sysctl net.inet.sctp.blackhole=2") 580 581 import scapy.all as sp 582 583 packet = sp.IP(src="192.0.2.1", dst="192.0.2.2") \ 584 / sp.SCTP(sport=1234, dport=1234) \ 585 / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500) 586 packet.show() 587 588 r = sp.sr1(packet, timeout=3) 589 assert r 590 r.show() 591 assert r.getlayer(sp.SCTP) 592 assert r.getlayer(sp.SCTPChunkInitAck) 593 assert r.getlayer(sp.SCTP).tag == 1 594 595 # Send another INIT with the same initiate tag, expect another init ack 596 packet = sp.IP(src="192.0.2.1", dst="192.0.2.2") \ 597 / sp.SCTP(sport=1234, dport=1234) \ 598 / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500) 599 packet.show() 600 601 r = sp.sr1(packet, timeout=3) 602 assert r 603 r.show() 604 assert r.getlayer(sp.SCTP) 605 assert r.getlayer(sp.SCTPChunkInitAck) 606 assert r.getlayer(sp.SCTP).tag == 1 607 608 # Send an INIT with a different initiate tag, expect another init ack 609 packet = sp.IP(src="192.0.2.1", dst="192.0.2.2") \ 610 / sp.SCTP(sport=1234, dport=1234) \ 611 / sp.SCTPChunkInit(init_tag=42, n_in_streams=1, n_out_streams=1, a_rwnd=1500) 612 packet.show() 613 614 r = sp.sr1(packet, timeout=3) 615 assert r 616 r.show() 617 assert r.getlayer(sp.SCTP) 618 assert r.getlayer(sp.SCTPChunkInitAck) 619 assert r.getlayer(sp.SCTP).tag == 42 620 621 @pytest.mark.require_user("root") 622 @pytest.mark.require_progs(["scapy"]) 623 def test_too_many_add_ip(self): 624 import scapy.all as sp 625 DEPTH=90 626 params=[] 627 for i in range(0, DEPTH): 628 ch = sp.SCTPChunkParamAddIPAddr(len=(DEPTH - i) * 8) 629 params.append(ch) 630 packet = sp.IP(src="192.0.2.1", dst="192.0.2.2") \ 631 / sp.SCTP(sport=4321, dport=1234) \ 632 / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500, 633 params=params) 634 packet.show() 635 sp.hexdump(packet) 636 print("len %d" % len(packet)) 637 638 r = sp.sr1(packet, timeout=3) 639 # We should not get a reply to this 640 if r: 641 r.show() 642 assert not r 643 644class TestSCTPv6(VnetTestTemplate): 645 REQUIRED_MODULES = ["sctp", "pf"] 646 TOPOLOGY = { 647 "vnet1": {"ifaces": ["if1"]}, 648 "vnet2": {"ifaces": ["if1"]}, 649 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 650 } 651 652 def vnet2_handler(self, vnet): 653 # Give ourself a second IP address, for multihome testing 654 ifname = vnet.iface_alias_map["if1"].name 655 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::3/64" % ifname) 656 657 # Start an SCTP server process, pipe the ppid + data back to the other vnet? 658 srv = SCTPServer(socket.AF_INET6, port=1234) 659 while True: 660 srv.accept(vnet) 661 662 @pytest.mark.require_user("root") 663 def test_multihome(self): 664 srv_vnet = self.vnet_map["vnet2"] 665 666 ToolsHelper.print_output("/sbin/pfctl -e") 667 ToolsHelper.pf_rules([ 668 "block proto sctp", 669 "pass on lo", 670 "pass inet6 proto sctp to 2001:db8::0/64"]) 671 672 # Give the server some time to come up 673 time.sleep(3) 674 675 # Sanity check, we can communicate with the primary address. 676 client = SCTPClient("2001:db8::3", 1234) 677 client.send(b"hello", 0) 678 rcvd = self.wait_object(srv_vnet.pipe) 679 print(rcvd) 680 assert rcvd['ppid'] == 0 681 assert rcvd['data'] == "hello" 682 683 # Now change to a different peer address 684 try: 685 client.newpeer("2001:db8::2") 686 client.send(b"world", 0) 687 rcvd = self.wait_object(srv_vnet.pipe) 688 print(rcvd) 689 assert rcvd['ppid'] == 0 690 assert rcvd['data'] == "world" 691 finally: 692 # Debug output 693 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 694 695 # Check that we have the expected states 696 states = ToolsHelper.get_output("/sbin/pfctl -ss") 697 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states) 698 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states) 699 700 @pytest.mark.require_user("root") 701 def test_multihome_asconf(self): 702 srv_vnet = self.vnet_map["vnet2"] 703 704 # Assign a second IP to ourselves 705 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::10/64" 706 % self.vnet.iface_alias_map["if1"].name) 707 ToolsHelper.print_output("/sbin/pfctl -e") 708 ToolsHelper.pf_rules([ 709 "block proto sctp", 710 "pass on lo", 711 "pass inet6 proto sctp from 2001:db8::/64"]) 712 713 # Give the server some time to come up 714 time.sleep(3) 715 716 # Sanity check, we can communicate with the primary address. 717 client = SCTPClient("2001:db8::3", 1234, "2001:db8::1") 718 client.send(b"hello", 0) 719 rcvd = self.wait_object(srv_vnet.pipe) 720 print(rcvd) 721 assert rcvd['ppid'] == 0 722 assert rcvd['data'] == "hello" 723 724 # Now add our second address to the connection 725 client.bindx("2001:db8::10", True) 726 727 # We can still communicate 728 client.send(b"world", 0) 729 rcvd = self.wait_object(srv_vnet.pipe) 730 print(rcvd) 731 assert rcvd['ppid'] == 0 732 assert rcvd['data'] == "world" 733 734 # Now change to a different peer address 735 try: 736 client.newprimary("2001:db8::10") 737 client.send(b"!", 0) 738 rcvd = self.wait_object(srv_vnet.pipe, 5) 739 print(rcvd) 740 assert rcvd['ppid'] == 0 741 assert rcvd['data'] == "!" 742 finally: 743 # Debug output 744 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 745 746 # Check that we have the expected states 747 states = ToolsHelper.get_output("/sbin/pfctl -ss") 748 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states) 749 assert re.search(r"all sctp 2001:db8::10\[.*2001:db8::3\[1234\]", states) 750 751 # Now remove 2001:db8::1 as an address 752 client.bindx("2001:db8::1", False) 753 754 # Wecan still communicate 755 try: 756 client.send(b"More data", 0) 757 rcvd = self.wait_object(srv_vnet.pipe, 5) 758 print(rcvd) 759 assert rcvd['ppid'] == 0 760 assert rcvd['data'] == "More data" 761 finally: 762 # Debug output 763 ToolsHelper.print_output("/sbin/pfctl -ss -vv") 764 765 # Verify that the state is closing 766 states = ToolsHelper.get_output("/sbin/pfctl -ss") 767 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\].*SHUTDOWN", states) 768 769 @pytest.mark.require_user("root") 770 def test_permutation(self): 771 # Test that we generate all permutations of src/dst addresses. 772 # Assign two addresses to each end, and check for the expected states 773 srv_vnet = self.vnet_map["vnet2"] 774 775 ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name 776 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::4/64" % ifname) 777 778 ToolsHelper.print_output("/sbin/pfctl -e") 779 ToolsHelper.pf_rules([ 780 "set state-policy if-bound", 781 "block proto sctp", 782 "pass on lo", 783 "pass inet6 proto sctp to 2001:db8::0/64"]) 784 785 # Give the server some time to come up 786 time.sleep(3) 787 788 # Sanity check, we can communicate with the primary address. 789 client = SCTPClient("2001:db8::3", 1234) 790 client.send(b"hello", 0) 791 rcvd = self.wait_object(srv_vnet.pipe) 792 print(rcvd) 793 assert rcvd['ppid'] == 0 794 assert rcvd['data'] == "hello" 795 796 # Check that we have a state for 2001:db8::3 and 2001:db8::2 to 2001:db8::1, but also to 2001:db8::4 797 states = ToolsHelper.get_output("/sbin/pfctl -ss") 798 print(states) 799 assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states) 800 assert re.search(r"epair.*sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states) 801 assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::2\[1234\]", states) 802 assert re.search(r"epair.*sctp 2001:db8::4\[.*2001:db8::3\[1234\]", states) 803 804 @pytest.mark.require_user("root") 805 def test_permutation_floating(self): 806 # Test that we generate all permutations of src/dst addresses. 807 # Assign two addresses to each end, and check for the expected states 808 srv_vnet = self.vnet_map["vnet2"] 809 810 ifname = self.vnet_map["vnet1"].iface_alias_map["if1"].name 811 ToolsHelper.print_output("/sbin/ifconfig %s inet6 alias 2001:db8::4/64" % ifname) 812 813 ToolsHelper.print_output("/sbin/pfctl -e") 814 ToolsHelper.pf_rules([ 815 "block proto sctp", 816 "pass on lo", 817 "pass inet6 proto sctp to 2001:db8::0/64"]) 818 819 # Give the server some time to come up 820 time.sleep(3) 821 822 # Sanity check, we can communicate with the primary address. 823 client = SCTPClient("2001:db8::3", 1234) 824 client.send(b"hello", 0) 825 rcvd = self.wait_object(srv_vnet.pipe) 826 print(rcvd) 827 assert rcvd['ppid'] == 0 828 assert rcvd['data'] == "hello" 829 830 # Check that we have a state for 2001:db8::3 and 2001:db8::2 to 2001:db8::1, but also to 2001:db8::4 831 states = ToolsHelper.get_output("/sbin/pfctl -ss") 832 print(states) 833 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::2\[1234\]", states) 834 assert re.search(r"all sctp 2001:db8::1\[.*2001:db8::3\[1234\]", states) 835 assert re.search(r"all sctp 2001:db8::4\[.*2001:db8::2\[1234\]", states) 836 assert re.search(r"all sctp 2001:db8::4\[.*2001:db8::3\[1234\]", states) 837