1import pytest 2 3import logging 4import os 5import re 6import subprocess 7 8from atf_python.sys.net.vnet import IfaceFactory 9from atf_python.sys.net.vnet import SingleVnetTestTemplate 10from atf_python.sys.net.tools import ToolsHelper 11from typing import List 12from typing import Optional 13 14logging.getLogger("scapy").setLevel(logging.CRITICAL) 15import scapy.all as sc 16 17 18def build_response_packet(echo, ip, icmp, oip_ihl, special): 19 icmp_id_seq_types = [0, 8, 13, 14, 15, 16, 17, 18, 37, 38] 20 oip = echo[sc.IP] 21 oicmp = echo[sc.ICMP] 22 load = echo[sc.ICMP].payload 23 oip[sc.IP].remove_payload() 24 oicmp[sc.ICMP].remove_payload() 25 oicmp.type = 8 26 27 # As if the original IP packet had these set 28 oip.ihl = None 29 oip.len = None 30 oip.id = 1 31 oip.flags = ip.flags 32 oip.chksum = None 33 oip.options = ip.options 34 35 # Inner packet (oip) options 36 if oip_ihl: 37 oip.ihl = oip_ihl 38 39 # Special options 40 if special == "no-payload": 41 load = "" 42 if special == "tcp": 43 oip.proto = "tcp" 44 tcp = sc.TCP(sport=1234, dport=5678) 45 return ip / icmp / oip / tcp 46 if special == "udp": 47 oip.proto = "udp" 48 udp = sc.UDP(sport=1234, dport=5678) 49 return ip / icmp / oip / udp 50 if special == "warp": 51 # Build a package with a timestamp of INT_MAX 52 # (time-warped package) 53 payload_no_timestamp = sc.bytes_hex(load)[16:] 54 load = (b"\xff" * 8) + sc.hex_bytes(payload_no_timestamp) 55 if special == "wrong": 56 # Build a package with a wrong last byte 57 payload_no_last_byte = sc.bytes_hex(load)[:-2] 58 load = (sc.hex_bytes(payload_no_last_byte)) + b"\x00" 59 60 if icmp.type in icmp_id_seq_types: 61 pkt = ip / icmp / load 62 else: 63 ip.options = "" 64 pkt = ip / icmp / oip / oicmp / load 65 return pkt 66 67 68def generate_ip_options(opts): 69 if not opts: 70 return "" 71 72 routers = [ 73 "192.0.2.10", 74 "192.0.2.20", 75 "192.0.2.30", 76 "192.0.2.40", 77 "192.0.2.50", 78 "192.0.2.60", 79 "192.0.2.70", 80 "192.0.2.80", 81 "192.0.2.90", 82 ] 83 routers_zero = [0, 0, 0, 0, 0, 0, 0, 0, 0] 84 if opts == "EOL": 85 options = sc.IPOption(b"\x00") 86 elif opts == "NOP": 87 options = sc.IPOption(b"\x01") 88 elif opts == "NOP-40": 89 options = sc.IPOption(b"\x01" * 40) 90 elif opts == "RR": 91 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 92 options = sc.IPOption_RR(pointer=40, routers=routers) 93 elif opts == "RR-same": 94 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 95 options = sc.IPOption_RR(pointer=3, routers=routers_zero) 96 elif opts == "RR-trunc": 97 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 98 options = sc.IPOption_RR(length=7, routers=routers_zero) 99 elif opts == "LSRR": 100 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 101 options = sc.IPOption_LSRR(routers=routers) 102 elif opts == "LSRR-trunc": 103 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 104 options = sc.IPOption_LSRR(length=3, routers=routers_zero) 105 elif opts == "SSRR": 106 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 107 options = sc.IPOption_SSRR(routers=routers) 108 elif opts == "SSRR-trunc": 109 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 110 options = sc.IPOption_SSRR(length=3, routers=routers_zero) 111 elif opts == "unk": 112 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 113 options = sc.IPOption(b"\x9f") 114 elif opts == "unk-40": 115 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 116 options = sc.IPOption(b"\x9f" * 40) 117 else: 118 options = "" 119 return options 120 121 122def pinger( 123 # Required arguments 124 # Avoid setting defaults on these arguments, 125 # as we want to set them explicitly in the tests 126 iface: str, 127 /, 128 src: sc.scapy.fields.SourceIPField, 129 dst: sc.scapy.layers.inet.DestIPField, 130 icmp_type: sc.scapy.fields.ByteEnumField, 131 icmp_code: sc.scapy.fields.MultiEnumField, 132 # IP arguments 133 ihl: Optional[sc.scapy.fields.BitField] = None, 134 flags: Optional[sc.scapy.fields.FlagsField] = None, 135 opts: Optional[str] = None, 136 oip_ihl: Optional[sc.scapy.fields.BitField] = None, 137 special: Optional[str] = None, 138 # ICMP arguments 139 # Match names with <netinet/ip_icmp.h> 140 icmp_pptr: sc.scapy.fields.ByteField = 0, 141 icmp_gwaddr: sc.scapy.fields.IPField = "0.0.0.0", 142 icmp_nextmtu: sc.scapy.fields.ShortField = 0, 143 icmp_otime: sc.scapy.layers.inet.ICMPTimeStampField = 0, 144 icmp_rtime: sc.scapy.layers.inet.ICMPTimeStampField = 0, 145 icmp_ttime: sc.scapy.layers.inet.ICMPTimeStampField = 0, 146 icmp_mask: sc.scapy.fields.IPField = "0.0.0.0", 147 request: Optional[str] = None, 148 # Miscellaneous arguments 149 count: int = 1, 150 dup: bool = False, 151) -> subprocess.CompletedProcess: 152 """P I N G E R 153 154 Echo reply faker 155 156 :param str iface: Interface to send packet to 157 :keyword src: Source packet IP 158 :type src: class:`scapy.fields.SourceIPField` 159 :keyword dst: Destination packet IP 160 :type dst: class:`scapy.layers.inet.DestIPField` 161 :keyword icmp_type: ICMP type 162 :type icmp_type: class:`scapy.fields.ByteEnumField` 163 :keyword icmp_code: ICMP code 164 :type icmp_code: class:`scapy.fields.MultiEnumField` 165 166 :keyword ihl: Internet Header Length, defaults to None 167 :type ihl: class:`scapy.fields.BitField`, optional 168 :keyword flags: IP flags - one of `DF`, `MF` or `evil`, defaults to None 169 :type flags: class:`scapy.fields.FlagsField`, optional 170 :keyword opts: Include IP options - one of `EOL`, `NOP`, `NOP-40`, `unk`, 171 `unk-40`, `RR`, `RR-same`, `RR-trunc`, `LSRR`, `LSRR-trunc`, `SSRR` or 172 `SSRR-trunc`, defaults to None 173 :type opts: str, optional 174 :keyword oip_ihl: Inner packet's Internet Header Length, defaults to None 175 :type oip_ihl: class:`scapy.fields.BitField`, optional 176 :keyword special: Send a special packet - one of `no-payload`, `tcp`, 177 `udp`, `wrong` or `warp`, defaults to None 178 :type special: str, optional 179 :keyword icmp_pptr: ICMP pointer, defaults to 0 180 :type icmp_pptr: class:`scapy.fields.ByteField` 181 :keyword icmp_gwaddr: ICMP gateway IP address, defaults to "0.0.0.0" 182 :type icmp_gwaddr: class:`scapy.fields.IPField` 183 :keyword icmp_nextmtu: ICMP next MTU, defaults to 0 184 :type icmp_nextmtu: class:`scapy.fields.ShortField` 185 :keyword icmp_otime: ICMP originate timestamp, defaults to 0 186 :type icmp_otime: class:`scapy.layers.inet.ICMPTimeStampField` 187 :keyword icmp_rtime: ICMP receive timestamp, defaults to 0 188 :type icmp_rtime: class:`scapy.layers.inet.ICMPTimeStampField` 189 :keyword icmp_ttime: ICMP transmit timestamp, defaults to 0 190 :type icmp_ttime: class:`scapy.layers.inet.ICMPTimeStampField` 191 :keyword icmp_mask: ICMP address mask, defaults to "0.0.0.0" 192 :type icmp_mask: class:`scapy.fields.IPField` 193 :keyword request: Request type - one of `mask` or `timestamp`, 194 defaults to None 195 :type request: str, optional 196 :keyword count: Number of packets to send, defaults to 1 197 :type count: int 198 :keyword dup: Duplicate packets, defaults to `False` 199 :type dup: bool 200 201 :return: A class:`subprocess.CompletedProcess` with the output from the 202 ping utility 203 :rtype: class:`subprocess.CompletedProcess` 204 """ 205 tun = sc.TunTapInterface(iface) 206 subprocess.run(["ifconfig", tun.iface, "up"], check=True) 207 subprocess.run(["ifconfig", tun.iface, src, dst], check=True) 208 ip_opts = generate_ip_options(opts) 209 ip = sc.IP(ihl=ihl, flags=flags, src=dst, dst=src, options=ip_opts) 210 command = [ 211 "/sbin/ping", 212 "-c", 213 str(count), 214 "-t", 215 str(count), 216 "-v", 217 ] 218 if request == "mask": 219 command += ["-Mm"] 220 if request == "timestamp": 221 command += ["-Mt"] 222 if special: 223 command += ["-p1"] 224 if opts in [ 225 "RR", 226 "RR-same", 227 "RR-trunc", 228 "LSRR", 229 "LSRR-trunc", 230 "SSRR", 231 "SSRR-trunc", 232 ]: 233 command += ["-R"] 234 command += [dst] 235 with subprocess.Popen( 236 args=command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True 237 ) as ping: 238 for dummy in range(count): 239 echo = tun.recv() 240 icmp = sc.ICMP( 241 type=icmp_type, 242 code=icmp_code, 243 id=echo[sc.ICMP].id, 244 seq=echo[sc.ICMP].seq, 245 ts_ori=icmp_otime, 246 ts_rx=icmp_rtime, 247 ts_tx=icmp_ttime, 248 gw=icmp_gwaddr, 249 ptr=icmp_pptr, 250 addr_mask=icmp_mask, 251 nexthopmtu=icmp_nextmtu, 252 ) 253 pkt = build_response_packet(echo, ip, icmp, oip_ihl, special) 254 tun.send(pkt) 255 if dup is True: 256 tun.send(pkt) 257 stdout, stderr = ping.communicate() 258 return subprocess.CompletedProcess( 259 ping.args, ping.returncode, stdout, stderr 260 ) 261 262 263def redact(output): 264 """Redact some elements of ping's output""" 265 pattern_replacements = [ 266 ("localhost \([0-9]{1,3}(\.[0-9]{1,3}){3}\)", "localhost"), 267 ("from [0-9]{1,3}(\.[0-9]{1,3}){3}", "from"), 268 ("hlim=[0-9]*", "hlim="), 269 ("ttl=[0-9]*", "ttl="), 270 ("time=[0-9.-]*", "time="), 271 ("[0-9\.]+/[0-9.]+", "/"), 272 ] 273 for pattern, repl in pattern_replacements: 274 output = re.sub(pattern, repl, output) 275 return output 276 277 278class TestPing(SingleVnetTestTemplate): 279 IPV6_PREFIXES: List[str] = ["2001:db8::1/64"] 280 IPV4_PREFIXES: List[str] = ["192.0.2.1/24"] 281 282 # Each param in testdata contains a dictionary with the command, 283 # and the expected outcome (returncode, redacted stdout, and stderr) 284 testdata = [ 285 pytest.param( 286 { 287 "args": "ping -4 -c1 -s56 -t1 localhost", 288 "returncode": 0, 289 "stdout": """\ 290PING localhost: 56 data bytes 29164 bytes from: icmp_seq=0 ttl= time= ms 292 293--- localhost ping statistics --- 2941 packets transmitted, 1 packets received, 0.0% packet loss 295round-trip min/avg/max/stddev = /// ms 296""", 297 "stderr": "", 298 }, 299 id="_4_c1_s56_t1_localhost", 300 ), 301 pytest.param( 302 { 303 "args": "ping -6 -c1 -s8 -t1 localhost", 304 "returncode": 0, 305 "stdout": """\ 306PING6(56=40+8+8 bytes) ::1 --> ::1 30716 bytes from ::1, icmp_seq=0 hlim= time= ms 308 309--- localhost ping6 statistics --- 3101 packets transmitted, 1 packets received, 0.0% packet loss 311round-trip min/avg/max/std-dev = /// ms 312""", 313 "stderr": "", 314 }, 315 id="_6_c1_s8_t1_localhost", 316 ), 317 pytest.param( 318 { 319 "args": "ping -A -c1 192.0.2.1", 320 "returncode": 0, 321 "stdout": """\ 322PING 192.0.2.1 (192.0.2.1): 56 data bytes 32364 bytes from: icmp_seq=0 ttl= time= ms 324 325--- 192.0.2.1 ping statistics --- 3261 packets transmitted, 1 packets received, 0.0% packet loss 327round-trip min/avg/max/stddev = /// ms 328""", 329 "stderr": "", 330 }, 331 id="_A_c1_192_0_2_1", 332 ), 333 pytest.param( 334 { 335 "args": "ping -A -c1 192.0.2.2", 336 "returncode": 2, 337 "stdout": """\ 338PING 192.0.2.2 (192.0.2.2): 56 data bytes 339 340--- 192.0.2.2 ping statistics --- 3411 packets transmitted, 0 packets received, 100.0% packet loss 342""", 343 "stderr": "", 344 }, 345 id="_A_c1_192_0_2_2", 346 ), 347 pytest.param( 348 { 349 "args": "ping -A -c1 2001:db8::1", 350 "returncode": 0, 351 "stdout": """\ 352PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 35316 bytes from 2001:db8::1, icmp_seq=0 hlim= time= ms 354 355--- 2001:db8::1 ping6 statistics --- 3561 packets transmitted, 1 packets received, 0.0% packet loss 357round-trip min/avg/max/std-dev = /// ms 358""", 359 "stderr": "", 360 }, 361 id="_A_c1_2001_db8__1", 362 ), 363 pytest.param( 364 { 365 "args": "ping -A -c1 2001:db8::2", 366 "returncode": 2, 367 "stdout": """\ 368PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 369 370--- 2001:db8::2 ping6 statistics --- 3711 packets transmitted, 0 packets received, 100.0% packet loss 372""", 373 "stderr": "", 374 }, 375 id="_A_c1_2001_db8__2", 376 ), 377 pytest.param( 378 { 379 "args": "ping -A -c3 192.0.2.1", 380 "returncode": 0, 381 "stdout": """\ 382PING 192.0.2.1 (192.0.2.1): 56 data bytes 38364 bytes from: icmp_seq=0 ttl= time= ms 38464 bytes from: icmp_seq=1 ttl= time= ms 38564 bytes from: icmp_seq=2 ttl= time= ms 386 387--- 192.0.2.1 ping statistics --- 3883 packets transmitted, 3 packets received, 0.0% packet loss 389round-trip min/avg/max/stddev = /// ms 390""", 391 "stderr": "", 392 }, 393 id="_A_3_192_0.2.1", 394 ), 395 pytest.param( 396 { 397 "args": "ping -A -c3 192.0.2.2", 398 "returncode": 2, 399 "stdout": """\ 400\x07\x07PING 192.0.2.2 (192.0.2.2): 56 data bytes 401 402--- 192.0.2.2 ping statistics --- 4033 packets transmitted, 0 packets received, 100.0% packet loss 404""", 405 "stderr": "", 406 }, 407 id="_A_c3_192_0_2_2", 408 ), 409 pytest.param( 410 { 411 "args": "ping -A -c3 2001:db8::1", 412 "returncode": 0, 413 "stdout": """\ 414PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 41516 bytes from 2001:db8::1, icmp_seq=0 hlim= time= ms 41616 bytes from 2001:db8::1, icmp_seq=1 hlim= time= ms 41716 bytes from 2001:db8::1, icmp_seq=2 hlim= time= ms 418 419--- 2001:db8::1 ping6 statistics --- 4203 packets transmitted, 3 packets received, 0.0% packet loss 421round-trip min/avg/max/std-dev = /// ms 422""", 423 "stderr": "", 424 }, 425 id="_A_c3_2001_db8__1", 426 ), 427 pytest.param( 428 { 429 "args": "ping -A -c3 2001:db8::2", 430 "returncode": 2, 431 "stdout": """\ 432\x07\x07PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 433 434--- 2001:db8::2 ping6 statistics --- 4353 packets transmitted, 0 packets received, 100.0% packet loss 436""", 437 "stderr": "", 438 }, 439 id="_A_c3_2001_db8__2", 440 ), 441 pytest.param( 442 { 443 "args": "ping -c1 192.0.2.1", 444 "returncode": 0, 445 "stdout": """\ 446PING 192.0.2.1 (192.0.2.1): 56 data bytes 44764 bytes from: icmp_seq=0 ttl= time= ms 448 449--- 192.0.2.1 ping statistics --- 4501 packets transmitted, 1 packets received, 0.0% packet loss 451round-trip min/avg/max/stddev = /// ms 452""", 453 "stderr": "", 454 }, 455 id="_c1_192_0_2_1", 456 ), 457 pytest.param( 458 { 459 "args": "ping -c1 192.0.2.2", 460 "returncode": 2, 461 "stdout": """\ 462PING 192.0.2.2 (192.0.2.2): 56 data bytes 463 464--- 192.0.2.2 ping statistics --- 4651 packets transmitted, 0 packets received, 100.0% packet loss 466""", 467 "stderr": "", 468 }, 469 id="_c1_192_0_2_2", 470 ), 471 pytest.param( 472 { 473 "args": "ping -c1 2001:db8::1", 474 "returncode": 0, 475 "stdout": """\ 476PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 47716 bytes from 2001:db8::1, icmp_seq=0 hlim= time= ms 478 479--- 2001:db8::1 ping6 statistics --- 4801 packets transmitted, 1 packets received, 0.0% packet loss 481round-trip min/avg/max/std-dev = /// ms 482""", 483 "stderr": "", 484 }, 485 id="_c1_2001_db8__1", 486 ), 487 pytest.param( 488 { 489 "args": "ping -c1 2001:db8::2", 490 "returncode": 2, 491 "stdout": """\ 492PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 493 494--- 2001:db8::2 ping6 statistics --- 4951 packets transmitted, 0 packets received, 100.0% packet loss 496""", 497 "stderr": "", 498 }, 499 id="_c1_2001_db8__2", 500 ), 501 pytest.param( 502 { 503 "args": "ping -c1 -S127.0.0.1 -s56 -t1 localhost", 504 "returncode": 0, 505 "stdout": """\ 506PING localhost from: 56 data bytes 50764 bytes from: icmp_seq=0 ttl= time= ms 508 509--- localhost ping statistics --- 5101 packets transmitted, 1 packets received, 0.0% packet loss 511round-trip min/avg/max/stddev = /// ms 512""", 513 "stderr": "", 514 }, 515 id="_c1_S127_0_0_1_s56_t1_localhost", 516 ), 517 pytest.param( 518 { 519 "args": "ping -c1 -S::1 -s8 -t1 localhost", 520 "returncode": 0, 521 "stdout": """\ 522PING6(56=40+8+8 bytes) ::1 --> ::1 52316 bytes from ::1, icmp_seq=0 hlim= time= ms 524 525--- localhost ping6 statistics --- 5261 packets transmitted, 1 packets received, 0.0% packet loss 527round-trip min/avg/max/std-dev = /// ms 528""", 529 "stderr": "", 530 }, 531 id="_c1_S__1_s8_t1_localhost", 532 ), 533 pytest.param( 534 { 535 "args": "ping -c3 192.0.2.1", 536 "returncode": 0, 537 "stdout": """\ 538PING 192.0.2.1 (192.0.2.1): 56 data bytes 53964 bytes from: icmp_seq=0 ttl= time= ms 54064 bytes from: icmp_seq=1 ttl= time= ms 54164 bytes from: icmp_seq=2 ttl= time= ms 542 543--- 192.0.2.1 ping statistics --- 5443 packets transmitted, 3 packets received, 0.0% packet loss 545round-trip min/avg/max/stddev = /// ms 546""", 547 "stderr": "", 548 }, 549 id="_c3_192_0_2_1", 550 ), 551 pytest.param( 552 { 553 "args": "ping -c3 192.0.2.2", 554 "returncode": 2, 555 "stdout": """\ 556PING 192.0.2.2 (192.0.2.2): 56 data bytes 557 558--- 192.0.2.2 ping statistics --- 5593 packets transmitted, 0 packets received, 100.0% packet loss 560""", 561 "stderr": "", 562 }, 563 id="_c3_192_0_2_2", 564 ), 565 pytest.param( 566 { 567 "args": "ping -c3 2001:db8::1", 568 "returncode": 0, 569 "stdout": """\ 570PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 57116 bytes from 2001:db8::1, icmp_seq=0 hlim= time= ms 57216 bytes from 2001:db8::1, icmp_seq=1 hlim= time= ms 57316 bytes from 2001:db8::1, icmp_seq=2 hlim= time= ms 574 575--- 2001:db8::1 ping6 statistics --- 5763 packets transmitted, 3 packets received, 0.0% packet loss 577round-trip min/avg/max/std-dev = /// ms 578""", 579 "stderr": "", 580 }, 581 id="_c3_2001_db8__1", 582 ), 583 pytest.param( 584 { 585 "args": "ping -c3 2001:db8::2", 586 "returncode": 2, 587 "stdout": """\ 588PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 589 590--- 2001:db8::2 ping6 statistics --- 5913 packets transmitted, 0 packets received, 100.0% packet loss 592""", 593 "stderr": "", 594 }, 595 id="_c3_2001_db8__2", 596 ), 597 pytest.param( 598 { 599 "args": "ping -q -c1 192.0.2.1", 600 "returncode": 0, 601 "stdout": """\ 602PING 192.0.2.1 (192.0.2.1): 56 data bytes 603 604--- 192.0.2.1 ping statistics --- 6051 packets transmitted, 1 packets received, 0.0% packet loss 606round-trip min/avg/max/stddev = /// ms 607""", 608 "stderr": "", 609 }, 610 id="_q_c1_192_0_2_1", 611 ), 612 pytest.param( 613 { 614 "args": "ping -q -c1 192.0.2.2", 615 "returncode": 2, 616 "stdout": """\ 617PING 192.0.2.2 (192.0.2.2): 56 data bytes 618 619--- 192.0.2.2 ping statistics --- 6201 packets transmitted, 0 packets received, 100.0% packet loss 621""", 622 "stderr": "", 623 }, 624 id="_q_c1_192_0_2_2", 625 ), 626 pytest.param( 627 { 628 "args": "ping -q -c1 2001:db8::1", 629 "returncode": 0, 630 "stdout": """\ 631PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 632 633--- 2001:db8::1 ping6 statistics --- 6341 packets transmitted, 1 packets received, 0.0% packet loss 635round-trip min/avg/max/std-dev = /// ms 636""", 637 "stderr": "", 638 }, 639 id="_q_c1_2001_db8__1", 640 ), 641 pytest.param( 642 { 643 "args": "ping -q -c1 2001:db8::2", 644 "returncode": 2, 645 "stdout": """\ 646PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 647 648--- 2001:db8::2 ping6 statistics --- 6491 packets transmitted, 0 packets received, 100.0% packet loss 650""", 651 "stderr": "", 652 }, 653 id="_q_c1_2001_db8__2", 654 ), 655 pytest.param( 656 { 657 "args": "ping -q -c3 192.0.2.1", 658 "returncode": 0, 659 "stdout": """\ 660PING 192.0.2.1 (192.0.2.1): 56 data bytes 661 662--- 192.0.2.1 ping statistics --- 6633 packets transmitted, 3 packets received, 0.0% packet loss 664round-trip min/avg/max/stddev = /// ms 665""", 666 "stderr": "", 667 }, 668 id="_q_c3_192_0_2_1", 669 ), 670 pytest.param( 671 { 672 "args": "ping -q -c3 192.0.2.2", 673 "returncode": 2, 674 "stdout": """\ 675PING 192.0.2.2 (192.0.2.2): 56 data bytes 676 677--- 192.0.2.2 ping statistics --- 6783 packets transmitted, 0 packets received, 100.0% packet loss 679""", 680 "stderr": "", 681 }, 682 id="_q_c3_192_0_2_2", 683 ), 684 pytest.param( 685 { 686 "args": "ping -q -c3 2001:db8::1", 687 "returncode": 0, 688 "stdout": """\ 689PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 690 691--- 2001:db8::1 ping6 statistics --- 6923 packets transmitted, 3 packets received, 0.0% packet loss 693round-trip min/avg/max/std-dev = /// ms 694""", 695 "stderr": "", 696 }, 697 id="_q_c3_2001_db8__1", 698 ), 699 pytest.param( 700 { 701 "args": "ping -q -c3 2001:db8::2", 702 "returncode": 2, 703 "stdout": """\ 704PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 705 706--- 2001:db8::2 ping6 statistics --- 7073 packets transmitted, 0 packets received, 100.0% packet loss 708""", 709 "stderr": "", 710 }, 711 id="_q_c3_2001_db8__2", 712 ), 713 ] 714 715 @pytest.mark.parametrize("expected", testdata) 716 def test_ping(self, expected): 717 """Test ping""" 718 ping = subprocess.run( 719 expected["args"].split(), 720 capture_output=True, 721 timeout=15, 722 text=True, 723 ) 724 assert ping.returncode == expected["returncode"] 725 assert redact(ping.stdout) == expected["stdout"] 726 assert ping.stderr == expected["stderr"] 727 728 # Each param in ping46_testdata contains a dictionary with the arguments 729 # and the expected outcome (returncode, redacted stdout, and stderr) 730 # common to `ping -4` and `ping -6` 731 ping46_testdata = [ 732 pytest.param( 733 { 734 "args": "-Wx localhost", 735 "returncode": os.EX_USAGE, 736 "stdout": "", 737 "stderr": "ping: invalid timing interval: `x'\n", 738 }, 739 marks=pytest.mark.skip("XXX currently failing"), 740 id="_Wx_localhost", 741 ), 742 ] 743 744 @pytest.mark.parametrize("expected", ping46_testdata) 745 def test_ping_46(self, expected): 746 """Test ping -4/ping -6""" 747 for version in [4, 6]: 748 ping = subprocess.run( 749 ["ping", f"-{version}"] + expected["args"].split(), 750 capture_output=True, 751 timeout=15, 752 text=True, 753 ) 754 assert ping.returncode == expected["returncode"] 755 assert redact(ping.stdout) == expected["stdout"] 756 assert ping.stderr == expected["stderr"] 757 758 # Each param in pinger_testdata contains a dictionary with the keywords to 759 # `pinger()` and a dictionary with the expected outcome (returncode, 760 # stdout, stderr, and if ping's output is redacted) 761 pinger_testdata = [ 762 pytest.param( 763 { 764 "src": "192.0.2.1", 765 "dst": "192.0.2.2", 766 "icmp_type": 0, 767 "icmp_code": 0, 768 }, 769 { 770 "returncode": 0, 771 "stdout": """\ 772PING 192.0.2.2 (192.0.2.2): 56 data bytes 77364 bytes from: icmp_seq=0 ttl= time= ms 774 775--- 192.0.2.2 ping statistics --- 7761 packets transmitted, 1 packets received, 0.0% packet loss 777round-trip min/avg/max/stddev = /// ms 778""", 779 "stderr": "", 780 "redacted": True, 781 }, 782 id="_0_0", 783 ), 784 pytest.param( 785 { 786 "src": "192.0.2.1", 787 "dst": "192.0.2.2", 788 "icmp_type": 0, 789 "icmp_code": 0, 790 "opts": "NOP-40", 791 }, 792 { 793 "returncode": 0, 794 "stdout": """\ 795PING 192.0.2.2 (192.0.2.2): 56 data bytes 79664 bytes from: icmp_seq=0 ttl= time= ms 797wrong total length 124 instead of 84 798NOP 799NOP 800NOP 801NOP 802NOP 803NOP 804NOP 805NOP 806NOP 807NOP 808NOP 809NOP 810NOP 811NOP 812NOP 813NOP 814NOP 815NOP 816NOP 817NOP 818NOP 819NOP 820NOP 821NOP 822NOP 823NOP 824NOP 825NOP 826NOP 827NOP 828NOP 829NOP 830NOP 831NOP 832NOP 833NOP 834NOP 835NOP 836NOP 837NOP 838 839--- 192.0.2.2 ping statistics --- 8401 packets transmitted, 1 packets received, 0.0% packet loss 841round-trip min/avg/max/stddev = /// ms 842""", 843 "stderr": "", 844 "redacted": True, 845 }, 846 id="_0_0_opts_NOP_40", 847 ), 848 pytest.param( 849 { 850 "src": "192.0.2.1", 851 "dst": "192.0.2.2", 852 "icmp_type": 0, 853 "icmp_code": 0, 854 "opts": "unk", 855 }, 856 { 857 "returncode": 0, 858 "stdout": """\ 859PING 192.0.2.2 (192.0.2.2): 56 data bytes 86064 bytes from: icmp_seq=0 ttl= time= ms 861wrong total length 88 instead of 84 862unknown option 9f 863 864--- 192.0.2.2 ping statistics --- 8651 packets transmitted, 1 packets received, 0.0% packet loss 866round-trip min/avg/max/stddev = /// ms 867""", 868 "stderr": "", 869 "redacted": True, 870 }, 871 marks=pytest.mark.skip("XXX currently failing"), 872 id="_0_0_opts_unk", 873 ), 874 pytest.param( 875 { 876 "src": "192.0.2.1", 877 "dst": "192.0.2.2", 878 "icmp_type": 3, 879 "icmp_code": 1, 880 "opts": "NOP-40", 881 }, 882 { 883 "returncode": 2, 884 "stdout": """\ 885PING 192.0.2.2 (192.0.2.2): 56 data bytes 886132 bytes from 192.0.2.2: Destination Host Unreachable 887Vr HL TOS Len ID Flg off TTL Pro cks Src Dst 888 4 f 00 007c 0001 0 0000 40 01 d868 192.0.2.1 192.0.2.2 01010101010101010101010101010101010101010101010101010101010101010101010101010101 889 890 891--- 192.0.2.2 ping statistics --- 8921 packets transmitted, 0 packets received, 100.0% packet loss 893""", 894 "stderr": "", 895 "redacted": False, 896 }, 897 marks=pytest.mark.skip("XXX currently failing"), 898 id="_3_1_opts_NOP_40", 899 ), 900 pytest.param( 901 { 902 "src": "192.0.2.1", 903 "dst": "192.0.2.2", 904 "icmp_type": 3, 905 "icmp_code": 1, 906 "flags": "DF", 907 }, 908 { 909 "returncode": 2, 910 "stdout": """\ 911PING 192.0.2.2 (192.0.2.2): 56 data bytes 91292 bytes from 192.0.2.2: Destination Host Unreachable 913Vr HL TOS Len ID Flg off TTL Pro cks Src Dst 914 4 5 00 0054 0001 2 0000 40 01 b6a4 192.0.2.1 192.0.2.2 915 916 917--- 192.0.2.2 ping statistics --- 9181 packets transmitted, 0 packets received, 100.0% packet loss 919""", 920 "stderr": "", 921 "redacted": False, 922 }, 923 marks=pytest.mark.skip("XXX currently failing"), 924 id="_3_1_flags_DF", 925 ), 926 ] 927 928 @pytest.mark.parametrize("pinger_kargs, expected", pinger_testdata) 929 @pytest.mark.require_progs(["scapy"]) 930 @pytest.mark.require_user("root") 931 def test_pinger(self, pinger_kargs, expected): 932 """Test ping using pinger(), a reply faker""" 933 iface = IfaceFactory().create_iface("", "tun")[0].name 934 ping = pinger(iface, **pinger_kargs) 935 assert ping.returncode == expected["returncode"] 936 if expected["redacted"]: 937 assert redact(ping.stdout) == expected["stdout"] 938 else: 939 assert ping.stdout == expected["stdout"] 940 assert ping.stderr == expected["stderr"] 941