1import errno 2import ipaddress 3import socket 4import struct 5import time 6from ctypes import c_byte 7from ctypes import c_uint 8from ctypes import Structure 9 10import pytest 11from atf_python.sys.net.rtsock import SaHelper 12from atf_python.sys.net.tools import ToolsHelper 13from atf_python.sys.net.vnet import SingleVnetTestTemplate 14from atf_python.sys.net.vnet import VnetTestTemplate 15 16 17class In6Pktinfo(Structure): 18 _fields_ = [ 19 ("ipi6_addr", c_byte * 16), 20 ("ipi6_ifindex", c_uint), 21 ] 22 23 24class VerboseSocketServer: 25 def __init__(self, ip: str, port: int, ifname: str = None): 26 self.ip = ip 27 self.port = port 28 29 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 30 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) 31 addr = ipaddress.ip_address(ip) 32 if addr.is_link_local and ifname: 33 ifindex = socket.if_nametoindex(ifname) 34 addr_tuple = (ip, port, 0, ifindex) 35 elif addr.is_multicast and ifname: 36 ifindex = socket.if_nametoindex(ifname) 37 mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex) 38 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) 39 print("## JOINED group {} % {}".format(ip, ifname)) 40 addr_tuple = ("::", port, 0, ifindex) 41 else: 42 addr_tuple = (ip, port, 0, 0) 43 print("## Listening on [{}]:{}".format(addr_tuple[0], port)) 44 s.bind(addr_tuple) 45 self.socket = s 46 47 def recv(self): 48 # data = self.socket.recv(4096) 49 # print("RX: " + data) 50 data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128) 51 # Assume ancdata has just 1 item 52 info = In6Pktinfo.from_buffer_copy(ancdata[0][2]) 53 dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr) 54 dst_iface = socket.if_indextoname(info.ipi6_ifindex) 55 56 tx_obj = { 57 "data": data, 58 "src_ip": address[0], 59 "dst_ip": dst_ip, 60 "dst_iface": dst_iface, 61 } 62 return tx_obj 63 64 65class BaseTestIP6Ouput(VnetTestTemplate): 66 TOPOLOGY = { 67 "vnet1": {"ifaces": ["if1", "if2", "if3"]}, 68 "vnet2": {"ifaces": ["if1", "if2", "if3"]}, 69 "if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]}, 70 "if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]}, 71 "if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]}, 72 } 73 DEFAULT_PORT = 45365 74 75 def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None): 76 """Generic listener that sends first received packet with metadata 77 back to the sender via pipw 78 """ 79 ll_data = ToolsHelper.get_linklocals() 80 # Start listener 81 ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname) 82 vnet.pipe.send(ll_data) 83 84 tx_obj = ss.recv() 85 tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias 86 vnet.pipe.send(tx_obj) 87 88 89class TestIP6Output(BaseTestIP6Ouput): 90 def vnet2_handler(self, vnet): 91 ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip) 92 self._vnet2_handler(vnet, ip, None) 93 94 @pytest.mark.require_user("root") 95 def test_output6_base(self): 96 """Tests simple UDP output""" 97 second_vnet = self.vnet_map["vnet2"] 98 99 # Pick target on if2 vnet2's end 100 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) 101 ip = str(ifaddr.ip) 102 103 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 104 data = bytes("AAAA", "utf-8") 105 print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT)) 106 107 # Wait for the child to become ready 108 self.wait_object(second_vnet.pipe) 109 s.sendto(data, (ip, self.DEFAULT_PORT)) 110 111 # Wait for the received object 112 rx_obj = self.wait_object(second_vnet.pipe) 113 assert rx_obj["dst_ip"] == ip 114 assert rx_obj["dst_iface_alias"] == "if2" 115 116 @pytest.mark.require_user("root") 117 def test_output6_nhop(self): 118 """Tests UDP output with custom nhop set""" 119 second_vnet = self.vnet_map["vnet2"] 120 121 # Pick target on if2 vnet2's end 122 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) 123 ip_dst = str(ifaddr.ip) 124 # Pick nexthop on if1 125 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1]) 126 ip_next = str(ifaddr.ip) 127 sin6_next = SaHelper.ip6_sa(ip_next, 0) 128 129 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) 130 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next) 131 132 # Wait for the child to become ready 133 self.wait_object(second_vnet.pipe) 134 data = bytes("AAAA", "utf-8") 135 s.sendto(data, (ip_dst, self.DEFAULT_PORT)) 136 137 # Wait for the received object 138 rx_obj = self.wait_object(second_vnet.pipe) 139 assert rx_obj["dst_ip"] == ip_dst 140 assert rx_obj["dst_iface_alias"] == "if1" 141 142 @pytest.mark.parametrize( 143 "params", 144 [ 145 # esrc: src-ip, if: src-interface, esrc: expected-src, 146 # eif: expected-rx-interface 147 pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"), 148 pytest.param( 149 {"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"}, 150 id="iponly1", 151 ), 152 pytest.param( 153 { 154 "src": "2001:db8:c::1", 155 "if": "if3", 156 "ex": errno.EHOSTUNREACH, 157 }, 158 id="ipandif", 159 ), 160 pytest.param( 161 { 162 "src": "2001:db8:c::aaaa", 163 "ex": errno.EADDRNOTAVAIL, 164 }, 165 id="nolocalip", 166 ), 167 pytest.param( 168 {"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame" 169 ), 170 ], 171 ) 172 @pytest.mark.require_user("root") 173 def test_output6_pktinfo(self, params): 174 """Tests simple UDP output""" 175 second_vnet = self.vnet_map["vnet2"] 176 vnet = self.vnet 177 178 # Pick target on if2 vnet2's end 179 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) 180 dst_ip = str(ifaddr.ip) 181 182 src_ip = params.get("src", "") 183 src_ifname = params.get("if", "") 184 expected_ip = params.get("esrc", "") 185 expected_ifname = params.get("eif", "") 186 errno = params.get("ex", 0) 187 188 pktinfo = In6Pktinfo() 189 if src_ip: 190 for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)): 191 pktinfo.ipi6_addr[i] = b 192 if src_ifname: 193 os_ifname = vnet.iface_alias_map[src_ifname].name 194 pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname) 195 196 # Wait for the child to become ready 197 self.wait_object(second_vnet.pipe) 198 data = bytes("AAAA", "utf-8") 199 200 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) 201 try: 202 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo)) 203 aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo)) 204 s.sendto(data, (dst_ip, self.DEFAULT_PORT)) 205 except OSError as e: 206 if not errno: 207 raise 208 assert e.errno == errno 209 print("Correctly raised {}".format(e)) 210 return 211 212 # Wait for the received object 213 rx_obj = self.wait_object(second_vnet.pipe) 214 215 assert rx_obj["dst_ip"] == dst_ip 216 if expected_ip: 217 assert rx_obj["src_ip"] == expected_ip 218 if expected_ifname: 219 assert rx_obj["dst_iface_alias"] == expected_ifname 220 221 222class TestIP6OutputLL(BaseTestIP6Ouput): 223 def vnet2_handler(self, vnet): 224 """Generic listener that sends first received packet with metadata 225 back to the sender via pipw 226 """ 227 os_ifname = vnet.iface_alias_map["if2"].name 228 ll_data = ToolsHelper.get_linklocals() 229 ll_ip, _ = ll_data[os_ifname][0] 230 self._vnet2_handler(vnet, ll_ip, os_ifname) 231 232 @pytest.mark.require_user("root") 233 def test_output6_linklocal(self): 234 """Tests simple UDP output""" 235 second_vnet = self.vnet_map["vnet2"] 236 237 # Wait for the child to become ready 238 ll_data = self.wait_object(second_vnet.pipe) 239 240 # Pick LL address on if2 vnet2's end 241 ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0] 242 # Get local interface scope 243 os_ifname = self.vnet.iface_alias_map["if2"].name 244 scopeid = socket.if_nametoindex(os_ifname) 245 246 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 247 data = bytes("AAAA", "utf-8") 248 target = (ip, self.DEFAULT_PORT, 0, scopeid) 249 print("## TX packet to {}%{},{}".format(ip, scopeid, target[1])) 250 251 s.sendto(data, target) 252 253 # Wait for the received object 254 rx_obj = self.wait_object(second_vnet.pipe) 255 assert rx_obj["dst_ip"] == ip 256 assert rx_obj["dst_iface_alias"] == "if2" 257 258 259class TestIP6OutputNhopLL(BaseTestIP6Ouput): 260 def vnet2_handler(self, vnet): 261 """Generic listener that sends first received packet with metadata 262 back to the sender via pipw 263 """ 264 ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip) 265 self._vnet2_handler(vnet, ip, None) 266 267 @pytest.mark.require_user("root") 268 def test_output6_nhop_linklocal(self): 269 """Tests UDP output with custom link-local nhop set""" 270 second_vnet = self.vnet_map["vnet2"] 271 272 # Wait for the child to become ready 273 ll_data = self.wait_object(second_vnet.pipe) 274 275 # Pick target on if2 vnet2's end 276 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) 277 ip_dst = str(ifaddr.ip) 278 # Pick nexthop on if1 279 ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0] 280 # Get local interfaces 281 os_ifname = self.vnet.iface_alias_map["if1"].name 282 scopeid = socket.if_nametoindex(os_ifname) 283 sin6_next = SaHelper.ip6_sa(ip_next, scopeid) 284 285 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) 286 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next) 287 288 data = bytes("AAAA", "utf-8") 289 s.sendto(data, (ip_dst, self.DEFAULT_PORT)) 290 291 # Wait for the received object 292 rx_obj = self.wait_object(second_vnet.pipe) 293 assert rx_obj["dst_ip"] == ip_dst 294 assert rx_obj["dst_iface_alias"] == "if1" 295 296 297class TestIP6OutputScope(BaseTestIP6Ouput): 298 def vnet2_handler(self, vnet): 299 """Generic listener that sends first received packet with metadata 300 back to the sender via pipw 301 """ 302 bind_ip, bind_ifp = self.wait_object(vnet.pipe) 303 if bind_ip is None: 304 os_ifname = vnet.iface_alias_map[bind_ifp].name 305 ll_data = ToolsHelper.get_linklocals() 306 bind_ip, _ = ll_data[os_ifname][0] 307 if bind_ifp is not None: 308 bind_ifp = vnet.iface_alias_map[bind_ifp].name 309 print("## BIND {}%{}".format(bind_ip, bind_ifp)) 310 self._vnet2_handler(vnet, bind_ip, bind_ifp) 311 312 @pytest.mark.parametrize( 313 "params", 314 [ 315 # sif/dif: source/destination interface (for link-local addr) 316 # sip/dip: source/destination ip (for non-LL addr) 317 # ex: OSError errno that sendto() must raise 318 pytest.param({"sif": "if2", "dif": "if2"}, id="same"), 319 pytest.param( 320 { 321 "sif": "if1", 322 "dif": "if2", 323 "ex": errno.EHOSTUNREACH, 324 }, 325 id="ll_differentif1", 326 ), 327 pytest.param( 328 { 329 "sif": "if1", 330 "dip": "2001:db8:b::2", 331 "ex": errno.EHOSTUNREACH, 332 }, 333 id="ll_differentif2", 334 ), 335 pytest.param( 336 { 337 "sip": "2001:db8:a::1", 338 "dif": "if2", 339 }, 340 id="gu_to_ll", 341 ), 342 ], 343 ) 344 @pytest.mark.require_user("root") 345 def test_output6_linklocal_scope(self, params): 346 """Tests simple UDP output""" 347 second_vnet = self.vnet_map["vnet2"] 348 349 src_ifp = params.get("sif") 350 src_ip = params.get("sip") 351 dst_ifp = params.get("dif") 352 dst_ip = params.get("dip") 353 errno = params.get("ex", 0) 354 355 # Sent ifp/IP to bind on 356 second_vnet = self.vnet_map["vnet2"] 357 second_vnet.pipe.send((dst_ip, dst_ifp)) 358 359 # Wait for the child to become ready 360 ll_data = self.wait_object(second_vnet.pipe) 361 362 if dst_ip is None: 363 # Pick LL address on dst_ifp vnet2's end 364 dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0] 365 # Get local interface scope 366 os_ifname = self.vnet.iface_alias_map[dst_ifp].name 367 scopeid = socket.if_nametoindex(os_ifname) 368 target = (dst_ip, self.DEFAULT_PORT, 0, scopeid) 369 else: 370 target = (dst_ip, self.DEFAULT_PORT, 0, 0) 371 372 # Bind 373 if src_ip is None: 374 ll_data = ToolsHelper.get_linklocals() 375 os_ifname = self.vnet.iface_alias_map[src_ifp].name 376 src_ip, _ = ll_data[os_ifname][0] 377 scopeid = socket.if_nametoindex(os_ifname) 378 src = (src_ip, self.DEFAULT_PORT, 0, scopeid) 379 else: 380 src = (src_ip, self.DEFAULT_PORT, 0, 0) 381 382 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 383 s.bind(src) 384 data = bytes("AAAA", "utf-8") 385 print("## TX packet {} -> {}".format(src, target)) 386 387 try: 388 s.sendto(data, target) 389 except OSError as e: 390 if not errno: 391 raise 392 assert e.errno == errno 393 print("Correctly raised {}".format(e)) 394 return 395 396 # Wait for the received object 397 rx_obj = self.wait_object(second_vnet.pipe) 398 assert rx_obj["dst_ip"] == dst_ip 399 assert rx_obj["src_ip"] == src_ip 400 # assert rx_obj["dst_iface_alias"] == "if2" 401 402 403class TestIP6OutputMulticast(BaseTestIP6Ouput): 404 def vnet2_handler(self, vnet): 405 group = self.wait_object(vnet.pipe) 406 os_ifname = vnet.iface_alias_map["if2"].name 407 self._vnet2_handler(vnet, group, os_ifname) 408 409 @pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"]) 410 @pytest.mark.require_user("root") 411 def test_output6_multicast(self, group_scope): 412 """Tests simple UDP output""" 413 second_vnet = self.vnet_map["vnet2"] 414 415 group = "{}::3456".format(group_scope) 416 second_vnet.pipe.send(group) 417 418 # Pick target on if2 vnet2's end 419 ip = group 420 os_ifname = self.vnet.iface_alias_map["if2"].name 421 ifindex = socket.if_nametoindex(os_ifname) 422 optval = struct.pack("I", ifindex) 423 424 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 425 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval) 426 427 data = bytes("AAAA", "utf-8") 428 429 # Wait for the child to become ready 430 self.wait_object(second_vnet.pipe) 431 432 print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT)) 433 s.sendto(data, (ip, self.DEFAULT_PORT)) 434 435 # Wait for the received object 436 rx_obj = self.wait_object(second_vnet.pipe) 437 assert rx_obj["dst_ip"] == ip 438 assert rx_obj["dst_iface_alias"] == "if2" 439 440 441class TestIP6OutputLoopback(SingleVnetTestTemplate): 442 IPV6_PREFIXES = ["2001:db8:a::1/64"] 443 DEFAULT_PORT = 45365 444 445 @pytest.mark.parametrize( 446 "source_validation", 447 [ 448 pytest.param(0, id="no_sav"), 449 pytest.param(1, id="sav"), 450 ], 451 ) 452 @pytest.mark.parametrize("scope", ["gu", "ll", "lo"]) 453 def test_output6_self_tcp(self, scope, source_validation): 454 """Tests IPv6 TCP connection to the local IPv6 address""" 455 456 ToolsHelper.set_sysctl( 457 "net.inet6.ip6.source_address_validation", source_validation 458 ) 459 460 if scope == "gu": 461 ip = "2001:db8:a::1" 462 addr_tuple = (ip, self.DEFAULT_PORT) 463 elif scope == "ll": 464 os_ifname = self.vnet.iface_alias_map["if1"].name 465 ifindex = socket.if_nametoindex(os_ifname) 466 ll_data = ToolsHelper.get_linklocals() 467 ip, _ = ll_data[os_ifname][0] 468 addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex) 469 elif scope == "lo": 470 ip = "::1" 471 ToolsHelper.get_output("route add -6 ::1/128 -iface lo0") 472 ifindex = socket.if_nametoindex("lo0") 473 addr_tuple = (ip, self.DEFAULT_PORT) 474 else: 475 assert 0 == 1 476 print("address: {}".format(addr_tuple)) 477 478 start = time.perf_counter() 479 ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) 480 ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) 481 ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 482 ss.bind(addr_tuple) 483 ss.listen() 484 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) 485 s.settimeout(2.0) 486 s.connect(addr_tuple) 487 conn, from_addr = ss.accept() 488 duration = time.perf_counter() - start 489 490 assert from_addr[0] == ip 491 assert duration < 1.0 492 493 @pytest.mark.parametrize( 494 "source_validation", 495 [ 496 pytest.param(0, id="no_sav"), 497 pytest.param(1, id="sav"), 498 ], 499 ) 500 @pytest.mark.parametrize("scope", ["gu", "ll", "lo"]) 501 def test_output6_self_udp(self, scope, source_validation): 502 """Tests IPv6 UDP connection to the local IPv6 address""" 503 504 ToolsHelper.set_sysctl( 505 "net.inet6.ip6.source_address_validation", source_validation 506 ) 507 508 if scope == "gu": 509 ip = "2001:db8:a::1" 510 addr_tuple = (ip, self.DEFAULT_PORT) 511 elif scope == "ll": 512 os_ifname = self.vnet.iface_alias_map["if1"].name 513 ifindex = socket.if_nametoindex(os_ifname) 514 ll_data = ToolsHelper.get_linklocals() 515 ip, _ = ll_data[os_ifname][0] 516 addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex) 517 elif scope == "lo": 518 ip = "::1" 519 ToolsHelper.get_output("route add -6 ::1/128 -iface lo0") 520 ifindex = socket.if_nametoindex("lo0") 521 addr_tuple = (ip, self.DEFAULT_PORT) 522 else: 523 assert 0 == 1 524 print("address: {}".format(addr_tuple)) 525 526 start = time.perf_counter() 527 ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) 528 ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) 529 ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 530 ss.bind(addr_tuple) 531 ss.listen() 532 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) 533 s.settimeout(2.0) 534 s.connect(addr_tuple) 535 conn, from_addr = ss.accept() 536 duration = time.perf_counter() - start 537 538 assert from_addr[0] == ip 539 assert duration < 1.0 540