1import errno 2import ipaddress 3import socket 4import struct 5import time 6from ctypes import c_byte 7from ctypes import c_uint 8from ctypes import Structure 9 10import pytest 11from atf_python.sys.net.rtsock import SaHelper 12from atf_python.sys.net.tools import ToolsHelper 13from atf_python.sys.net.vnet import run_cmd 14from atf_python.sys.net.vnet import SingleVnetTestTemplate 15from atf_python.sys.net.vnet import VnetTestTemplate 16 17 18class In6Pktinfo(Structure): 19 _fields_ = [ 20 ("ipi6_addr", c_byte * 16), 21 ("ipi6_ifindex", c_uint), 22 ] 23 24 25class VerboseSocketServer: 26 def __init__(self, ip: str, port: int, ifname: str = None): 27 self.ip = ip 28 self.port = port 29 30 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 31 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) 32 addr = ipaddress.ip_address(ip) 33 if addr.is_link_local and ifname: 34 ifindex = socket.if_nametoindex(ifname) 35 addr_tuple = (ip, port, 0, ifindex) 36 elif addr.is_multicast and ifname: 37 ifindex = socket.if_nametoindex(ifname) 38 mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex) 39 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) 40 print("## JOINED group {} % {}".format(ip, ifname)) 41 addr_tuple = ("::", port, 0, ifindex) 42 else: 43 addr_tuple = (ip, port, 0, 0) 44 print("## Listening on [{}]:{}".format(addr_tuple[0], port)) 45 s.bind(addr_tuple) 46 self.socket = s 47 48 def recv(self): 49 # data = self.socket.recv(4096) 50 # print("RX: " + data) 51 data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128) 52 # Assume ancdata has just 1 item 53 info = In6Pktinfo.from_buffer_copy(ancdata[0][2]) 54 dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr) 55 dst_iface = socket.if_indextoname(info.ipi6_ifindex) 56 57 tx_obj = { 58 "data": data, 59 "src_ip": address[0], 60 "dst_ip": dst_ip, 61 "dst_iface": dst_iface, 62 } 63 return tx_obj 64 65 66class BaseTestIP6Ouput(VnetTestTemplate): 67 TOPOLOGY = { 68 "vnet1": {"ifaces": ["if1", "if2", "if3"]}, 69 "vnet2": {"ifaces": ["if1", "if2", "if3"]}, 70 "if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]}, 71 "if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]}, 72 "if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]}, 73 } 74 DEFAULT_PORT = 45365 75 76 def _vnet2_handler(self, vnet, obj_map, pipe, ip: str, os_ifname: str = None): 77 """Generic listener that sends first received packet with metadata 78 back to the sender via pipw 79 """ 80 ll_data = ToolsHelper.get_linklocals() 81 # Start listener 82 ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname) 83 pipe.send(ll_data) 84 85 tx_obj = ss.recv() 86 tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias 87 pipe.send(tx_obj) 88 89 90class TestIP6Output(BaseTestIP6Ouput): 91 def vnet2_handler(self, vnet, obj_map, pipe): 92 ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip) 93 self._vnet2_handler(vnet, obj_map, pipe, ip, None) 94 95 @pytest.mark.require_user("root") 96 def test_output6_base(self): 97 """Tests simple UDP output""" 98 second_vnet = self.vnet_map["vnet2"] 99 100 # Pick target on if2 vnet2's end 101 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) 102 ip = str(ifaddr.ip) 103 104 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 105 data = bytes("AAAA", "utf-8") 106 print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT)) 107 108 # Wait for the child to become ready 109 self.wait_object(second_vnet.pipe) 110 s.sendto(data, (ip, self.DEFAULT_PORT)) 111 112 # Wait for the received object 113 rx_obj = self.wait_object(second_vnet.pipe) 114 assert rx_obj["dst_ip"] == ip 115 assert rx_obj["dst_iface_alias"] == "if2" 116 117 @pytest.mark.require_user("root") 118 def test_output6_nhop(self): 119 """Tests UDP output with custom nhop set""" 120 second_vnet = self.vnet_map["vnet2"] 121 122 # Pick target on if2 vnet2's end 123 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) 124 ip_dst = str(ifaddr.ip) 125 # Pick nexthop on if1 126 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1]) 127 ip_next = str(ifaddr.ip) 128 sin6_next = SaHelper.ip6_sa(ip_next, 0) 129 130 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) 131 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next) 132 133 # Wait for the child to become ready 134 self.wait_object(second_vnet.pipe) 135 data = bytes("AAAA", "utf-8") 136 s.sendto(data, (ip_dst, self.DEFAULT_PORT)) 137 138 # Wait for the received object 139 rx_obj = self.wait_object(second_vnet.pipe) 140 assert rx_obj["dst_ip"] == ip_dst 141 assert rx_obj["dst_iface_alias"] == "if1" 142 143 @pytest.mark.parametrize( 144 "params", 145 [ 146 # esrc: src-ip, if: src-interface, esrc: expected-src, 147 # eif: expected-rx-interface 148 pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"), 149 pytest.param( 150 {"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"}, 151 id="iponly1", 152 ), 153 pytest.param( 154 { 155 "src": "2001:db8:c::1", 156 "if": "if3", 157 "ex": errno.EHOSTUNREACH, 158 }, 159 id="ipandif", 160 ), 161 pytest.param( 162 { 163 "src": "2001:db8:c::aaaa", 164 "ex": errno.EADDRNOTAVAIL, 165 }, 166 id="nolocalip", 167 ), 168 pytest.param( 169 {"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame" 170 ), 171 ], 172 ) 173 @pytest.mark.require_user("root") 174 def test_output6_pktinfo(self, params): 175 """Tests simple UDP output""" 176 second_vnet = self.vnet_map["vnet2"] 177 vnet = self.vnet 178 179 # Pick target on if2 vnet2's end 180 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) 181 dst_ip = str(ifaddr.ip) 182 183 src_ip = params.get("src", "") 184 src_ifname = params.get("if", "") 185 expected_ip = params.get("esrc", "") 186 expected_ifname = params.get("eif", "") 187 errno = params.get("ex", 0) 188 189 pktinfo = In6Pktinfo() 190 if src_ip: 191 for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)): 192 pktinfo.ipi6_addr[i] = b 193 if src_ifname: 194 os_ifname = vnet.iface_alias_map[src_ifname].name 195 pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname) 196 197 # Wait for the child to become ready 198 self.wait_object(second_vnet.pipe) 199 data = bytes("AAAA", "utf-8") 200 201 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) 202 try: 203 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo)) 204 aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo)) 205 s.sendto(data, (dst_ip, self.DEFAULT_PORT)) 206 except OSError as e: 207 if not errno: 208 raise 209 assert e.errno == errno 210 print("Correctly raised {}".format(e)) 211 return 212 213 # Wait for the received object 214 rx_obj = self.wait_object(second_vnet.pipe) 215 216 assert rx_obj["dst_ip"] == dst_ip 217 if expected_ip: 218 assert rx_obj["src_ip"] == expected_ip 219 if expected_ifname: 220 assert rx_obj["dst_iface_alias"] == expected_ifname 221 222 223class TestIP6OutputLL(BaseTestIP6Ouput): 224 def vnet2_handler(self, vnet, obj_map, pipe): 225 """Generic listener that sends first received packet with metadata 226 back to the sender via pipw 227 """ 228 os_ifname = vnet.iface_alias_map["if2"].name 229 ll_data = ToolsHelper.get_linklocals() 230 ll_ip, _ = ll_data[os_ifname][0] 231 self._vnet2_handler(vnet, obj_map, pipe, ll_ip, os_ifname) 232 233 @pytest.mark.require_user("root") 234 def test_output6_linklocal(self): 235 """Tests simple UDP output""" 236 second_vnet = self.vnet_map["vnet2"] 237 238 # Wait for the child to become ready 239 ll_data = self.wait_object(second_vnet.pipe) 240 241 # Pick LL address on if2 vnet2's end 242 ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0] 243 # Get local interface scope 244 os_ifname = self.vnet.iface_alias_map["if2"].name 245 scopeid = socket.if_nametoindex(os_ifname) 246 247 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 248 data = bytes("AAAA", "utf-8") 249 target = (ip, self.DEFAULT_PORT, 0, scopeid) 250 print("## TX packet to {}%{},{}".format(ip, scopeid, target[1])) 251 252 s.sendto(data, target) 253 254 # Wait for the received object 255 rx_obj = self.wait_object(second_vnet.pipe) 256 assert rx_obj["dst_ip"] == ip 257 assert rx_obj["dst_iface_alias"] == "if2" 258 259 260class TestIP6OutputNhopLL(BaseTestIP6Ouput): 261 def vnet2_handler(self, vnet, obj_map, pipe): 262 """Generic listener that sends first received packet with metadata 263 back to the sender via pipw 264 """ 265 ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip) 266 self._vnet2_handler(vnet, obj_map, pipe, ip, None) 267 268 @pytest.mark.require_user("root") 269 def test_output6_nhop_linklocal(self): 270 """Tests UDP output with custom link-local nhop set""" 271 second_vnet = self.vnet_map["vnet2"] 272 273 # Wait for the child to become ready 274 ll_data = self.wait_object(second_vnet.pipe) 275 276 # Pick target on if2 vnet2's end 277 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) 278 ip_dst = str(ifaddr.ip) 279 # Pick nexthop on if1 280 ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0] 281 # Get local interfaces 282 os_ifname = self.vnet.iface_alias_map["if1"].name 283 scopeid = socket.if_nametoindex(os_ifname) 284 sin6_next = SaHelper.ip6_sa(ip_next, scopeid) 285 286 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) 287 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next) 288 289 data = bytes("AAAA", "utf-8") 290 s.sendto(data, (ip_dst, self.DEFAULT_PORT)) 291 292 # Wait for the received object 293 rx_obj = self.wait_object(second_vnet.pipe) 294 assert rx_obj["dst_ip"] == ip_dst 295 assert rx_obj["dst_iface_alias"] == "if1" 296 297 298class TestIP6OutputScope(BaseTestIP6Ouput): 299 def vnet2_handler(self, vnet, obj_map, pipe): 300 """Generic listener that sends first received packet with metadata 301 back to the sender via pipw 302 """ 303 bind_ip, bind_ifp = self.wait_object(pipe) 304 if bind_ip is None: 305 os_ifname = vnet.iface_alias_map[bind_ifp].name 306 ll_data = ToolsHelper.get_linklocals() 307 bind_ip, _ = ll_data[os_ifname][0] 308 if bind_ifp is not None: 309 bind_ifp = vnet.iface_alias_map[bind_ifp].name 310 print("## BIND {}%{}".format(bind_ip, bind_ifp)) 311 self._vnet2_handler(vnet, obj_map, pipe, bind_ip, bind_ifp) 312 313 @pytest.mark.parametrize( 314 "params", 315 [ 316 # sif/dif: source/destination interface (for link-local addr) 317 # sip/dip: source/destination ip (for non-LL addr) 318 # ex: OSError errno that sendto() must raise 319 pytest.param({"sif": "if2", "dif": "if2"}, id="same"), 320 pytest.param( 321 { 322 "sif": "if1", 323 "dif": "if2", 324 "ex": errno.EHOSTUNREACH, 325 }, 326 id="ll_differentif1", 327 ), 328 pytest.param( 329 { 330 "sif": "if1", 331 "dip": "2001:db8:b::2", 332 "ex": errno.EHOSTUNREACH, 333 }, 334 id="ll_differentif2", 335 ), 336 pytest.param( 337 { 338 "sip": "2001:db8:a::1", 339 "dif": "if2", 340 }, 341 id="gu_to_ll", 342 ), 343 ], 344 ) 345 @pytest.mark.require_user("root") 346 def test_output6_linklocal_scope(self, params): 347 """Tests simple UDP output""" 348 second_vnet = self.vnet_map["vnet2"] 349 350 src_ifp = params.get("sif") 351 src_ip = params.get("sip") 352 dst_ifp = params.get("dif") 353 dst_ip = params.get("dip") 354 errno = params.get("ex", 0) 355 356 # Sent ifp/IP to bind on 357 second_vnet = self.vnet_map["vnet2"] 358 second_vnet.pipe.send((dst_ip, dst_ifp)) 359 360 # Wait for the child to become ready 361 ll_data = self.wait_object(second_vnet.pipe) 362 363 if dst_ip is None: 364 # Pick LL address on dst_ifp vnet2's end 365 dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0] 366 # Get local interface scope 367 os_ifname = self.vnet.iface_alias_map[dst_ifp].name 368 scopeid = socket.if_nametoindex(os_ifname) 369 target = (dst_ip, self.DEFAULT_PORT, 0, scopeid) 370 else: 371 target = (dst_ip, self.DEFAULT_PORT, 0, 0) 372 373 # Bind 374 if src_ip is None: 375 ll_data = ToolsHelper.get_linklocals() 376 os_ifname = self.vnet.iface_alias_map[src_ifp].name 377 src_ip, _ = ll_data[os_ifname][0] 378 scopeid = socket.if_nametoindex(os_ifname) 379 src = (src_ip, self.DEFAULT_PORT, 0, scopeid) 380 else: 381 src = (src_ip, self.DEFAULT_PORT, 0, 0) 382 383 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 384 s.bind(src) 385 data = bytes("AAAA", "utf-8") 386 print("## TX packet {} -> {}".format(src, target)) 387 388 try: 389 s.sendto(data, target) 390 except OSError as e: 391 if not errno: 392 raise 393 assert e.errno == errno 394 print("Correctly raised {}".format(e)) 395 return 396 397 # Wait for the received object 398 rx_obj = self.wait_object(second_vnet.pipe) 399 assert rx_obj["dst_ip"] == dst_ip 400 assert rx_obj["src_ip"] == src_ip 401 # assert rx_obj["dst_iface_alias"] == "if2" 402 403 404class TestIP6OutputMulticast(BaseTestIP6Ouput): 405 def vnet2_handler(self, vnet, obj_map, pipe): 406 group = self.wait_object(pipe) 407 os_ifname = vnet.iface_alias_map["if2"].name 408 self._vnet2_handler(vnet, obj_map, pipe, group, os_ifname) 409 410 @pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"]) 411 @pytest.mark.require_user("root") 412 def test_output6_multicast(self, group_scope): 413 """Tests simple UDP output""" 414 second_vnet = self.vnet_map["vnet2"] 415 416 group = "{}::3456".format(group_scope) 417 second_vnet.pipe.send(group) 418 419 # Pick target on if2 vnet2's end 420 ip = group 421 os_ifname = self.vnet.iface_alias_map["if2"].name 422 ifindex = socket.if_nametoindex(os_ifname) 423 optval = struct.pack("I", ifindex) 424 425 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 426 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval) 427 428 data = bytes("AAAA", "utf-8") 429 430 # Wait for the child to become ready 431 self.wait_object(second_vnet.pipe) 432 433 print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT)) 434 s.sendto(data, (ip, self.DEFAULT_PORT)) 435 436 # Wait for the received object 437 rx_obj = self.wait_object(second_vnet.pipe) 438 assert rx_obj["dst_ip"] == ip 439 assert rx_obj["dst_iface_alias"] == "if2" 440 441 442class TestIP6OutputLoopback(SingleVnetTestTemplate): 443 IPV6_PREFIXES = ["2001:db8:a::1/64"] 444 DEFAULT_PORT = 45365 445 446 @pytest.mark.parametrize( 447 "source_validation", 448 [ 449 pytest.param(0, id="no_sav"), 450 pytest.param(1, id="sav", marks=pytest.mark.skip(reason="fails")), 451 ], 452 ) 453 @pytest.mark.parametrize("scope", ["gu", "ll", "lo"]) 454 def test_output6_self_tcp(self, scope, source_validation): 455 """Tests IPv6 TCP connection to the local IPv6 address""" 456 457 ToolsHelper.set_sysctl( 458 "net.inet6.ip6.source_address_validation", source_validation 459 ) 460 461 if scope == "gu": 462 ip = "2001:db8:a::1" 463 addr_tuple = (ip, self.DEFAULT_PORT) 464 elif scope == "ll": 465 os_ifname = self.vnet.iface_alias_map["if1"].name 466 ifindex = socket.if_nametoindex(os_ifname) 467 ll_data = ToolsHelper.get_linklocals() 468 ip, _ = ll_data[os_ifname][0] 469 addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex) 470 elif scope == "lo": 471 ip = "::1" 472 ToolsHelper.get_output("route add -6 ::1/128 -iface lo0") 473 ifindex = socket.if_nametoindex("lo0") 474 addr_tuple = (ip, self.DEFAULT_PORT) 475 else: 476 assert 0 == 1 477 print("address: {}".format(addr_tuple)) 478 479 start = time.perf_counter() 480 ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) 481 ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) 482 ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 483 ss.bind(addr_tuple) 484 ss.listen() 485 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) 486 s.settimeout(2.0) 487 s.connect(addr_tuple) 488 conn, from_addr = ss.accept() 489 duration = time.perf_counter() - start 490 491 assert from_addr[0] == ip 492 assert duration < 1.0 493 494 @pytest.mark.parametrize( 495 "source_validation", 496 [ 497 pytest.param(0, id="no_sav"), 498 pytest.param(1, id="sav", marks=pytest.mark.skip(reason="fails")), 499 ], 500 ) 501 @pytest.mark.parametrize("scope", ["gu", "ll", "lo"]) 502 def test_output6_self_udp(self, scope, source_validation): 503 """Tests IPv6 UDP connection to the local IPv6 address""" 504 505 ToolsHelper.set_sysctl( 506 "net.inet6.ip6.source_address_validation", source_validation 507 ) 508 509 if scope == "gu": 510 ip = "2001:db8:a::1" 511 addr_tuple = (ip, self.DEFAULT_PORT) 512 elif scope == "ll": 513 os_ifname = self.vnet.iface_alias_map["if1"].name 514 ifindex = socket.if_nametoindex(os_ifname) 515 ll_data = ToolsHelper.get_linklocals() 516 ip, _ = ll_data[os_ifname][0] 517 addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex) 518 elif scope == "lo": 519 ip = "::1" 520 ToolsHelper.get_output("route add -6 ::1/128 -iface lo0") 521 ifindex = socket.if_nametoindex("lo0") 522 addr_tuple = (ip, self.DEFAULT_PORT) 523 else: 524 assert 0 == 1 525 print("address: {}".format(addr_tuple)) 526 527 start = time.perf_counter() 528 ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) 529 ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) 530 ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 531 ss.bind(addr_tuple) 532 ss.listen() 533 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) 534 s.settimeout(2.0) 535 s.connect(addr_tuple) 536 conn, from_addr = ss.accept() 537 duration = time.perf_counter() - start 538 539 assert from_addr[0] == ip 540 assert duration < 1.0 541