1import pytest 2 3import logging 4import os 5import re 6import subprocess 7 8from atf_python.sys.net.vnet import IfaceFactory 9from atf_python.sys.net.vnet import SingleVnetTestTemplate 10from atf_python.sys.net.tools import ToolsHelper 11from typing import List 12from typing import Optional 13 14logging.getLogger("scapy").setLevel(logging.CRITICAL) 15import scapy.all as sc 16 17 18def build_response_packet(echo, ip, icmp, oip_ihl, special): 19 icmp_id_seq_types = [0, 8, 13, 14, 15, 16, 17, 18, 37, 38] 20 oip = echo[sc.IP] 21 oicmp = echo[sc.ICMP] 22 load = echo[sc.ICMP].payload 23 oip[sc.IP].remove_payload() 24 oicmp[sc.ICMP].remove_payload() 25 oicmp.type = 8 26 27 # As if the original IP packet had these set 28 oip.ihl = None 29 oip.len = None 30 oip.id = 1 31 oip.flags = ip.flags 32 oip.chksum = None 33 oip.options = ip.options 34 35 # Inner packet (oip) options 36 if oip_ihl: 37 oip.ihl = oip_ihl 38 39 # Special options 40 if special == "no-payload": 41 load = "" 42 if special == "tcp": 43 oip.proto = "tcp" 44 tcp = sc.TCP(sport=1234, dport=5678) 45 return ip / icmp / oip / tcp 46 if special == "udp": 47 oip.proto = "udp" 48 udp = sc.UDP(sport=1234, dport=5678) 49 return ip / icmp / oip / udp 50 if special == "warp": 51 # Build a package with a timestamp of INT_MAX 52 # (time-warped package) 53 payload_no_timestamp = sc.bytes_hex(load)[16:] 54 load = (b"\xff" * 8) + sc.hex_bytes(payload_no_timestamp) 55 if special == "wrong": 56 # Build a package with a wrong last byte 57 payload_no_last_byte = sc.bytes_hex(load)[:-2] 58 load = (sc.hex_bytes(payload_no_last_byte)) + b"\x00" 59 60 if icmp.type in icmp_id_seq_types: 61 pkt = ip / icmp / load 62 else: 63 ip.options = "" 64 pkt = ip / icmp / oip / oicmp / load 65 return pkt 66 67 68def generate_ip_options(opts): 69 if not opts: 70 return "" 71 72 routers = [ 73 "192.0.2.10", 74 "192.0.2.20", 75 "192.0.2.30", 76 "192.0.2.40", 77 "192.0.2.50", 78 "192.0.2.60", 79 "192.0.2.70", 80 "192.0.2.80", 81 "192.0.2.90", 82 ] 83 routers_zero = [0, 0, 0, 0, 0, 0, 0, 0, 0] 84 if opts == "EOL": 85 options = sc.IPOption(b"\x00") 86 elif opts == "NOP": 87 options = sc.IPOption(b"\x01") 88 elif opts == "NOP-40": 89 options = sc.IPOption(b"\x01" * 40) 90 elif opts == "RR": 91 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 92 options = sc.IPOption_RR(pointer=40, routers=routers) 93 elif opts == "RR-same": 94 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 95 options = sc.IPOption_RR(pointer=3, routers=routers_zero) 96 elif opts == "RR-trunc": 97 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 98 options = sc.IPOption_RR(length=7, routers=routers_zero) 99 elif opts == "LSRR": 100 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 101 options = sc.IPOption_LSRR(routers=routers) 102 elif opts == "LSRR-trunc": 103 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 104 options = sc.IPOption_LSRR(length=3, routers=routers_zero) 105 elif opts == "SSRR": 106 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 107 options = sc.IPOption_SSRR(routers=routers) 108 elif opts == "SSRR-trunc": 109 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 110 options = sc.IPOption_SSRR(length=3, routers=routers_zero) 111 elif opts == "unk": 112 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 113 options = sc.IPOption(b"\x9f") 114 elif opts == "unk-40": 115 ToolsHelper.set_sysctl("net.inet.ip.process_options", 0) 116 options = sc.IPOption(b"\x9f" * 40) 117 else: 118 options = "" 119 return options 120 121 122def pinger( 123 # Required arguments 124 # Avoid setting defaults on these arguments, 125 # as we want to set them explicitly in the tests 126 iface: str, 127 /, 128 src: sc.scapy.fields.SourceIPField, 129 dst: sc.scapy.layers.inet.DestIPField, 130 icmp_type: sc.scapy.fields.ByteEnumField, 131 icmp_code: sc.scapy.fields.MultiEnumField, 132 # IP arguments 133 ihl: Optional[sc.scapy.fields.BitField] = None, 134 flags: Optional[sc.scapy.fields.FlagsField] = None, 135 opts: Optional[str] = None, 136 oip_ihl: Optional[sc.scapy.fields.BitField] = None, 137 special: Optional[str] = None, 138 # ICMP arguments 139 # Match names with <netinet/ip_icmp.h> 140 icmp_pptr: sc.scapy.fields.ByteField = 0, 141 icmp_gwaddr: sc.scapy.fields.IPField = "0.0.0.0", 142 icmp_nextmtu: sc.scapy.fields.ShortField = 0, 143 icmp_otime: sc.scapy.layers.inet.ICMPTimeStampField = 0, 144 icmp_rtime: sc.scapy.layers.inet.ICMPTimeStampField = 0, 145 icmp_ttime: sc.scapy.layers.inet.ICMPTimeStampField = 0, 146 icmp_mask: sc.scapy.fields.IPField = "0.0.0.0", 147 request: Optional[str] = None, 148 # Miscellaneous arguments 149 count: int = 1, 150 dup: bool = False, 151) -> subprocess.CompletedProcess: 152 """P I N G E R 153 154 Echo reply faker 155 156 :param str iface: Interface to send packet to 157 :keyword src: Source packet IP 158 :type src: class:`scapy.fields.SourceIPField` 159 :keyword dst: Destination packet IP 160 :type dst: class:`scapy.layers.inet.DestIPField` 161 :keyword icmp_type: ICMP type 162 :type icmp_type: class:`scapy.fields.ByteEnumField` 163 :keyword icmp_code: ICMP code 164 :type icmp_code: class:`scapy.fields.MultiEnumField` 165 166 :keyword ihl: Internet Header Length, defaults to None 167 :type ihl: class:`scapy.fields.BitField`, optional 168 :keyword flags: IP flags - one of `DF`, `MF` or `evil`, defaults to None 169 :type flags: class:`scapy.fields.FlagsField`, optional 170 :keyword opts: Include IP options - one of `EOL`, `NOP`, `NOP-40`, `unk`, 171 `unk-40`, `RR`, `RR-same`, `RR-trunc`, `LSRR`, `LSRR-trunc`, `SSRR` or 172 `SSRR-trunc`, defaults to None 173 :type opts: str, optional 174 :keyword oip_ihl: Inner packet's Internet Header Length, defaults to None 175 :type oip_ihl: class:`scapy.fields.BitField`, optional 176 :keyword special: Send a special packet - one of `no-payload`, `tcp`, 177 `udp`, `wrong` or `warp`, defaults to None 178 :type special: str, optional 179 :keyword icmp_pptr: ICMP pointer, defaults to 0 180 :type icmp_pptr: class:`scapy.fields.ByteField` 181 :keyword icmp_gwaddr: ICMP gateway IP address, defaults to "0.0.0.0" 182 :type icmp_gwaddr: class:`scapy.fields.IPField` 183 :keyword icmp_nextmtu: ICMP next MTU, defaults to 0 184 :type icmp_nextmtu: class:`scapy.fields.ShortField` 185 :keyword icmp_otime: ICMP originate timestamp, defaults to 0 186 :type icmp_otime: class:`scapy.layers.inet.ICMPTimeStampField` 187 :keyword icmp_rtime: ICMP receive timestamp, defaults to 0 188 :type icmp_rtime: class:`scapy.layers.inet.ICMPTimeStampField` 189 :keyword icmp_ttime: ICMP transmit timestamp, defaults to 0 190 :type icmp_ttime: class:`scapy.layers.inet.ICMPTimeStampField` 191 :keyword icmp_mask: ICMP address mask, defaults to "0.0.0.0" 192 :type icmp_mask: class:`scapy.fields.IPField` 193 :keyword request: Request type - one of `mask` or `timestamp`, 194 defaults to None 195 :type request: str, optional 196 :keyword count: Number of packets to send, defaults to 1 197 :type count: int 198 :keyword dup: Duplicate packets, defaults to `False` 199 :type dup: bool 200 201 :return: A class:`subprocess.CompletedProcess` with the output from the 202 ping utility 203 :rtype: class:`subprocess.CompletedProcess` 204 """ 205 tun = sc.TunTapInterface(iface) 206 subprocess.run(["ifconfig", tun.iface, "up"], check=True) 207 subprocess.run(["ifconfig", tun.iface, src, dst], check=True) 208 ip_opts = generate_ip_options(opts) 209 ip = sc.IP(ihl=ihl, flags=flags, src=dst, dst=src, options=ip_opts) 210 command = [ 211 "/sbin/ping", 212 "-c", 213 str(count), 214 "-t", 215 str(count), 216 "-v", 217 ] 218 if request == "mask": 219 command += ["-Mm"] 220 if request == "timestamp": 221 command += ["-Mt"] 222 if special: 223 command += ["-p1"] 224 if opts in [ 225 "RR", 226 "RR-same", 227 "RR-trunc", 228 "LSRR", 229 "LSRR-trunc", 230 "SSRR", 231 "SSRR-trunc", 232 ]: 233 command += ["-R"] 234 command += [dst] 235 with subprocess.Popen( 236 args=command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True 237 ) as ping: 238 for dummy in range(count): 239 echo = tun.recv() 240 icmp = sc.ICMP( 241 type=icmp_type, 242 code=icmp_code, 243 id=echo[sc.ICMP].id, 244 seq=echo[sc.ICMP].seq, 245 ts_ori=icmp_otime, 246 ts_rx=icmp_rtime, 247 ts_tx=icmp_ttime, 248 gw=icmp_gwaddr, 249 ptr=icmp_pptr, 250 addr_mask=icmp_mask, 251 nexthopmtu=icmp_nextmtu, 252 ) 253 pkt = build_response_packet(echo, ip, icmp, oip_ihl, special) 254 tun.send(pkt) 255 if dup is True: 256 tun.send(pkt) 257 stdout, stderr = ping.communicate() 258 return subprocess.CompletedProcess( 259 ping.args, ping.returncode, stdout, stderr 260 ) 261 262 263def redact(output): 264 """Redact some elements of ping's output""" 265 pattern_replacements = [ 266 ("localhost \([0-9]{1,3}(\.[0-9]{1,3}){3}\)", "localhost"), 267 ("from [0-9]{1,3}(\.[0-9]{1,3}){3}", "from"), 268 ("hlim=[0-9]*", "hlim="), 269 ("ttl=[0-9]*", "ttl="), 270 ("time=[0-9.-]*", "time="), 271 ("\(-[0-9\.]+[0-9]+ ms\)", "(- ms)"), 272 ("[0-9\.]+/[0-9.]+", "/"), 273 ] 274 for pattern, repl in pattern_replacements: 275 output = re.sub(pattern, repl, output) 276 return output 277 278 279class TestPing(SingleVnetTestTemplate): 280 IPV6_PREFIXES: List[str] = ["2001:db8::1/64"] 281 IPV4_PREFIXES: List[str] = ["192.0.2.1/24"] 282 283 # Each param in testdata contains a dictionary with the command, 284 # and the expected outcome (returncode, redacted stdout, and stderr) 285 testdata = [ 286 pytest.param( 287 { 288 "args": "ping -4 -c1 -s56 -t1 localhost", 289 "returncode": 0, 290 "stdout": """\ 291PING localhost: 56 data bytes 29264 bytes from: icmp_seq=0 ttl= time= ms 293 294--- localhost ping statistics --- 2951 packets transmitted, 1 packets received, 0.0% packet loss 296round-trip min/avg/max/stddev = /// ms 297""", 298 "stderr": "", 299 }, 300 id="_4_c1_s56_t1_localhost", 301 ), 302 pytest.param( 303 { 304 "args": "ping -6 -c1 -s8 -t1 localhost", 305 "returncode": 0, 306 "stdout": """\ 307PING6(56=40+8+8 bytes) ::1 --> ::1 30816 bytes from ::1, icmp_seq=0 hlim= time= ms 309 310--- localhost ping6 statistics --- 3111 packets transmitted, 1 packets received, 0.0% packet loss 312round-trip min/avg/max/std-dev = /// ms 313""", 314 "stderr": "", 315 }, 316 id="_6_c1_s8_t1_localhost", 317 ), 318 pytest.param( 319 { 320 "args": "ping -A -c1 192.0.2.1", 321 "returncode": 0, 322 "stdout": """\ 323PING 192.0.2.1 (192.0.2.1): 56 data bytes 32464 bytes from: icmp_seq=0 ttl= time= ms 325 326--- 192.0.2.1 ping statistics --- 3271 packets transmitted, 1 packets received, 0.0% packet loss 328round-trip min/avg/max/stddev = /// ms 329""", 330 "stderr": "", 331 }, 332 id="_A_c1_192_0_2_1", 333 ), 334 pytest.param( 335 { 336 "args": "ping -A -c1 192.0.2.2", 337 "returncode": 2, 338 "stdout": """\ 339PING 192.0.2.2 (192.0.2.2): 56 data bytes 340 341--- 192.0.2.2 ping statistics --- 3421 packets transmitted, 0 packets received, 100.0% packet loss 343""", 344 "stderr": "", 345 }, 346 id="_A_c1_192_0_2_2", 347 ), 348 pytest.param( 349 { 350 "args": "ping -A -c1 2001:db8::1", 351 "returncode": 0, 352 "stdout": """\ 353PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 35416 bytes from 2001:db8::1, icmp_seq=0 hlim= time= ms 355 356--- 2001:db8::1 ping6 statistics --- 3571 packets transmitted, 1 packets received, 0.0% packet loss 358round-trip min/avg/max/std-dev = /// ms 359""", 360 "stderr": "", 361 }, 362 id="_A_c1_2001_db8__1", 363 ), 364 pytest.param( 365 { 366 "args": "ping -A -c1 2001:db8::2", 367 "returncode": 2, 368 "stdout": """\ 369PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 370 371--- 2001:db8::2 ping6 statistics --- 3721 packets transmitted, 0 packets received, 100.0% packet loss 373""", 374 "stderr": "", 375 }, 376 id="_A_c1_2001_db8__2", 377 ), 378 pytest.param( 379 { 380 "args": "ping -A -c3 192.0.2.1", 381 "returncode": 0, 382 "stdout": """\ 383PING 192.0.2.1 (192.0.2.1): 56 data bytes 38464 bytes from: icmp_seq=0 ttl= time= ms 38564 bytes from: icmp_seq=1 ttl= time= ms 38664 bytes from: icmp_seq=2 ttl= time= ms 387 388--- 192.0.2.1 ping statistics --- 3893 packets transmitted, 3 packets received, 0.0% packet loss 390round-trip min/avg/max/stddev = /// ms 391""", 392 "stderr": "", 393 }, 394 id="_A_3_192_0.2.1", 395 ), 396 pytest.param( 397 { 398 "args": "ping -A -c3 192.0.2.2", 399 "returncode": 2, 400 "stdout": """\ 401\x07\x07PING 192.0.2.2 (192.0.2.2): 56 data bytes 402 403--- 192.0.2.2 ping statistics --- 4043 packets transmitted, 0 packets received, 100.0% packet loss 405""", 406 "stderr": "", 407 }, 408 id="_A_c3_192_0_2_2", 409 ), 410 pytest.param( 411 { 412 "args": "ping -A -c3 2001:db8::1", 413 "returncode": 0, 414 "stdout": """\ 415PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 41616 bytes from 2001:db8::1, icmp_seq=0 hlim= time= ms 41716 bytes from 2001:db8::1, icmp_seq=1 hlim= time= ms 41816 bytes from 2001:db8::1, icmp_seq=2 hlim= time= ms 419 420--- 2001:db8::1 ping6 statistics --- 4213 packets transmitted, 3 packets received, 0.0% packet loss 422round-trip min/avg/max/std-dev = /// ms 423""", 424 "stderr": "", 425 }, 426 id="_A_c3_2001_db8__1", 427 ), 428 pytest.param( 429 { 430 "args": "ping -A -c3 2001:db8::2", 431 "returncode": 2, 432 "stdout": """\ 433\x07\x07PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 434 435--- 2001:db8::2 ping6 statistics --- 4363 packets transmitted, 0 packets received, 100.0% packet loss 437""", 438 "stderr": "", 439 }, 440 id="_A_c3_2001_db8__2", 441 ), 442 pytest.param( 443 { 444 "args": "ping -c1 192.0.2.1", 445 "returncode": 0, 446 "stdout": """\ 447PING 192.0.2.1 (192.0.2.1): 56 data bytes 44864 bytes from: icmp_seq=0 ttl= time= ms 449 450--- 192.0.2.1 ping statistics --- 4511 packets transmitted, 1 packets received, 0.0% packet loss 452round-trip min/avg/max/stddev = /// ms 453""", 454 "stderr": "", 455 }, 456 id="_c1_192_0_2_1", 457 ), 458 pytest.param( 459 { 460 "args": "ping -c1 192.0.2.2", 461 "returncode": 2, 462 "stdout": """\ 463PING 192.0.2.2 (192.0.2.2): 56 data bytes 464 465--- 192.0.2.2 ping statistics --- 4661 packets transmitted, 0 packets received, 100.0% packet loss 467""", 468 "stderr": "", 469 }, 470 id="_c1_192_0_2_2", 471 ), 472 pytest.param( 473 { 474 "args": "ping -c1 2001:db8::1", 475 "returncode": 0, 476 "stdout": """\ 477PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 47816 bytes from 2001:db8::1, icmp_seq=0 hlim= time= ms 479 480--- 2001:db8::1 ping6 statistics --- 4811 packets transmitted, 1 packets received, 0.0% packet loss 482round-trip min/avg/max/std-dev = /// ms 483""", 484 "stderr": "", 485 }, 486 id="_c1_2001_db8__1", 487 ), 488 pytest.param( 489 { 490 "args": "ping -c1 2001:db8::2", 491 "returncode": 2, 492 "stdout": """\ 493PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 494 495--- 2001:db8::2 ping6 statistics --- 4961 packets transmitted, 0 packets received, 100.0% packet loss 497""", 498 "stderr": "", 499 }, 500 id="_c1_2001_db8__2", 501 ), 502 pytest.param( 503 { 504 "args": "ping -c1 -S127.0.0.1 -s56 -t1 localhost", 505 "returncode": 0, 506 "stdout": """\ 507PING localhost from: 56 data bytes 50864 bytes from: icmp_seq=0 ttl= time= ms 509 510--- localhost ping statistics --- 5111 packets transmitted, 1 packets received, 0.0% packet loss 512round-trip min/avg/max/stddev = /// ms 513""", 514 "stderr": "", 515 }, 516 id="_c1_S127_0_0_1_s56_t1_localhost", 517 ), 518 pytest.param( 519 { 520 "args": "ping -c1 -S::1 -s8 -t1 localhost", 521 "returncode": 0, 522 "stdout": """\ 523PING6(56=40+8+8 bytes) ::1 --> ::1 52416 bytes from ::1, icmp_seq=0 hlim= time= ms 525 526--- localhost ping6 statistics --- 5271 packets transmitted, 1 packets received, 0.0% packet loss 528round-trip min/avg/max/std-dev = /// ms 529""", 530 "stderr": "", 531 }, 532 id="_c1_S__1_s8_t1_localhost", 533 ), 534 pytest.param( 535 { 536 "args": "ping -c3 192.0.2.1", 537 "returncode": 0, 538 "stdout": """\ 539PING 192.0.2.1 (192.0.2.1): 56 data bytes 54064 bytes from: icmp_seq=0 ttl= time= ms 54164 bytes from: icmp_seq=1 ttl= time= ms 54264 bytes from: icmp_seq=2 ttl= time= ms 543 544--- 192.0.2.1 ping statistics --- 5453 packets transmitted, 3 packets received, 0.0% packet loss 546round-trip min/avg/max/stddev = /// ms 547""", 548 "stderr": "", 549 }, 550 id="_c3_192_0_2_1", 551 ), 552 pytest.param( 553 { 554 "args": "ping -c3 192.0.2.2", 555 "returncode": 2, 556 "stdout": """\ 557PING 192.0.2.2 (192.0.2.2): 56 data bytes 558 559--- 192.0.2.2 ping statistics --- 5603 packets transmitted, 0 packets received, 100.0% packet loss 561""", 562 "stderr": "", 563 }, 564 id="_c3_192_0_2_2", 565 ), 566 pytest.param( 567 { 568 "args": "ping -c3 2001:db8::1", 569 "returncode": 0, 570 "stdout": """\ 571PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 57216 bytes from 2001:db8::1, icmp_seq=0 hlim= time= ms 57316 bytes from 2001:db8::1, icmp_seq=1 hlim= time= ms 57416 bytes from 2001:db8::1, icmp_seq=2 hlim= time= ms 575 576--- 2001:db8::1 ping6 statistics --- 5773 packets transmitted, 3 packets received, 0.0% packet loss 578round-trip min/avg/max/std-dev = /// ms 579""", 580 "stderr": "", 581 }, 582 id="_c3_2001_db8__1", 583 ), 584 pytest.param( 585 { 586 "args": "ping -c3 2001:db8::2", 587 "returncode": 2, 588 "stdout": """\ 589PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 590 591--- 2001:db8::2 ping6 statistics --- 5923 packets transmitted, 0 packets received, 100.0% packet loss 593""", 594 "stderr": "", 595 }, 596 id="_c3_2001_db8__2", 597 ), 598 pytest.param( 599 { 600 "args": "ping -q -c1 192.0.2.1", 601 "returncode": 0, 602 "stdout": """\ 603PING 192.0.2.1 (192.0.2.1): 56 data bytes 604 605--- 192.0.2.1 ping statistics --- 6061 packets transmitted, 1 packets received, 0.0% packet loss 607round-trip min/avg/max/stddev = /// ms 608""", 609 "stderr": "", 610 }, 611 id="_q_c1_192_0_2_1", 612 ), 613 pytest.param( 614 { 615 "args": "ping -q -c1 192.0.2.2", 616 "returncode": 2, 617 "stdout": """\ 618PING 192.0.2.2 (192.0.2.2): 56 data bytes 619 620--- 192.0.2.2 ping statistics --- 6211 packets transmitted, 0 packets received, 100.0% packet loss 622""", 623 "stderr": "", 624 }, 625 id="_q_c1_192_0_2_2", 626 ), 627 pytest.param( 628 { 629 "args": "ping -q -c1 2001:db8::1", 630 "returncode": 0, 631 "stdout": """\ 632PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 633 634--- 2001:db8::1 ping6 statistics --- 6351 packets transmitted, 1 packets received, 0.0% packet loss 636round-trip min/avg/max/std-dev = /// ms 637""", 638 "stderr": "", 639 }, 640 id="_q_c1_2001_db8__1", 641 ), 642 pytest.param( 643 { 644 "args": "ping -q -c1 2001:db8::2", 645 "returncode": 2, 646 "stdout": """\ 647PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 648 649--- 2001:db8::2 ping6 statistics --- 6501 packets transmitted, 0 packets received, 100.0% packet loss 651""", 652 "stderr": "", 653 }, 654 id="_q_c1_2001_db8__2", 655 ), 656 pytest.param( 657 { 658 "args": "ping -q -c3 192.0.2.1", 659 "returncode": 0, 660 "stdout": """\ 661PING 192.0.2.1 (192.0.2.1): 56 data bytes 662 663--- 192.0.2.1 ping statistics --- 6643 packets transmitted, 3 packets received, 0.0% packet loss 665round-trip min/avg/max/stddev = /// ms 666""", 667 "stderr": "", 668 }, 669 id="_q_c3_192_0_2_1", 670 ), 671 pytest.param( 672 { 673 "args": "ping -q -c3 192.0.2.2", 674 "returncode": 2, 675 "stdout": """\ 676PING 192.0.2.2 (192.0.2.2): 56 data bytes 677 678--- 192.0.2.2 ping statistics --- 6793 packets transmitted, 0 packets received, 100.0% packet loss 680""", 681 "stderr": "", 682 }, 683 id="_q_c3_192_0_2_2", 684 ), 685 pytest.param( 686 { 687 "args": "ping -q -c3 2001:db8::1", 688 "returncode": 0, 689 "stdout": """\ 690PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::1 691 692--- 2001:db8::1 ping6 statistics --- 6933 packets transmitted, 3 packets received, 0.0% packet loss 694round-trip min/avg/max/std-dev = /// ms 695""", 696 "stderr": "", 697 }, 698 id="_q_c3_2001_db8__1", 699 ), 700 pytest.param( 701 { 702 "args": "ping -q -c3 2001:db8::2", 703 "returncode": 2, 704 "stdout": """\ 705PING6(56=40+8+8 bytes) 2001:db8::1 --> 2001:db8::2 706 707--- 2001:db8::2 ping6 statistics --- 7083 packets transmitted, 0 packets received, 100.0% packet loss 709""", 710 "stderr": "", 711 }, 712 id="_q_c3_2001_db8__2", 713 ), 714 ] 715 716 @pytest.mark.parametrize("expected", testdata) 717 def test_ping(self, expected): 718 """Test ping""" 719 ping = subprocess.run( 720 expected["args"].split(), 721 capture_output=True, 722 timeout=15, 723 text=True, 724 ) 725 assert ping.returncode == expected["returncode"] 726 assert redact(ping.stdout) == expected["stdout"] 727 assert ping.stderr == expected["stderr"] 728 729 # Each param in ping46_testdata contains a dictionary with the arguments 730 # and the expected outcome (returncode, redacted stdout, and stderr) 731 # common to `ping -4` and `ping -6` 732 ping46_testdata = [ 733 pytest.param( 734 { 735 "args": "-Wx localhost", 736 "returncode": os.EX_USAGE, 737 "stdout": "", 738 "stderr": "ping: invalid timing interval: `x'\n", 739 }, 740 id="_Wx_localhost", 741 ), 742 ] 743 744 @pytest.mark.parametrize("expected", ping46_testdata) 745 def test_ping_46(self, expected): 746 """Test ping -4/ping -6""" 747 for version in [4, 6]: 748 ping = subprocess.run( 749 ["ping", f"-{version}"] + expected["args"].split(), 750 capture_output=True, 751 timeout=15, 752 text=True, 753 ) 754 assert ping.returncode == expected["returncode"] 755 assert redact(ping.stdout) == expected["stdout"] 756 assert ping.stderr == expected["stderr"] 757 758 # Each param in pinger_testdata contains a dictionary with the keywords to 759 # `pinger()` and a dictionary with the expected outcome (returncode, 760 # stdout, stderr, and if ping's output is redacted) 761 pinger_testdata = [ 762 pytest.param( 763 { 764 "src": "192.0.2.1", 765 "dst": "192.0.2.2", 766 "icmp_type": 0, 767 "icmp_code": 0, 768 }, 769 { 770 "returncode": 0, 771 "stdout": """\ 772PING 192.0.2.2 (192.0.2.2): 56 data bytes 77364 bytes from: icmp_seq=0 ttl= time= ms 774 775--- 192.0.2.2 ping statistics --- 7761 packets transmitted, 1 packets received, 0.0% packet loss 777round-trip min/avg/max/stddev = /// ms 778""", 779 "stderr": "", 780 "redacted": True, 781 }, 782 id="_0_0", 783 ), 784 pytest.param( 785 { 786 "src": "192.0.2.1", 787 "dst": "192.0.2.2", 788 "icmp_type": 0, 789 "icmp_code": 0, 790 "opts": "EOL", 791 }, 792 { 793 "returncode": 0, 794 "stdout": """\ 795PING 192.0.2.2 (192.0.2.2): 56 data bytes 79664 bytes from: icmp_seq=0 ttl= time= ms 797wrong total length 88 instead of 84 798 799--- 192.0.2.2 ping statistics --- 8001 packets transmitted, 1 packets received, 0.0% packet loss 801round-trip min/avg/max/stddev = /// ms 802""", 803 "stderr": "", 804 "redacted": True, 805 }, 806 id="_0_0_opts_EOL", 807 ), 808 pytest.param( 809 { 810 "src": "192.0.2.1", 811 "dst": "192.0.2.2", 812 "icmp_type": 0, 813 "icmp_code": 0, 814 "opts": "LSRR", 815 }, 816 { 817 "returncode": 0, 818 "stdout": """\ 819PING 192.0.2.2 (192.0.2.2): 56 data bytes 82064 bytes from: icmp_seq=0 ttl= time= ms 821LSRR: 192.0.2.10 822 192.0.2.20 823 192.0.2.30 824 192.0.2.40 825 192.0.2.50 826 192.0.2.60 827 192.0.2.70 828 192.0.2.80 829 192.0.2.90 830 831--- 192.0.2.2 ping statistics --- 8321 packets transmitted, 1 packets received, 0.0% packet loss 833round-trip min/avg/max/stddev = /// ms 834""", 835 "stderr": "", 836 "redacted": True, 837 }, 838 id="_0_0_opts_LSRR", 839 ), 840 pytest.param( 841 { 842 "src": "192.0.2.1", 843 "dst": "192.0.2.2", 844 "icmp_type": 0, 845 "icmp_code": 0, 846 "opts": "LSRR-trunc", 847 }, 848 { 849 "returncode": 0, 850 "stdout": """\ 851PING 192.0.2.2 (192.0.2.2): 56 data bytes 85264 bytes from: icmp_seq=0 ttl= time= ms 853LSRR: (truncated route) 854 855 856--- 192.0.2.2 ping statistics --- 8571 packets transmitted, 1 packets received, 0.0% packet loss 858round-trip min/avg/max/stddev = /// ms 859""", 860 "stderr": "", 861 "redacted": True, 862 }, 863 id="_0_0_opts_LSRR_trunc", 864 ), 865 pytest.param( 866 { 867 "src": "192.0.2.1", 868 "dst": "192.0.2.2", 869 "icmp_type": 0, 870 "icmp_code": 0, 871 "opts": "SSRR", 872 }, 873 { 874 "returncode": 0, 875 "stdout": """\ 876PING 192.0.2.2 (192.0.2.2): 56 data bytes 87764 bytes from: icmp_seq=0 ttl= time= ms 878SSRR: 192.0.2.10 879 192.0.2.20 880 192.0.2.30 881 192.0.2.40 882 192.0.2.50 883 192.0.2.60 884 192.0.2.70 885 192.0.2.80 886 192.0.2.90 887 888--- 192.0.2.2 ping statistics --- 8891 packets transmitted, 1 packets received, 0.0% packet loss 890round-trip min/avg/max/stddev = /// ms 891""", 892 "stderr": "", 893 "redacted": True, 894 }, 895 id="_0_0_opts_SSRR", 896 ), 897 pytest.param( 898 { 899 "src": "192.0.2.1", 900 "dst": "192.0.2.2", 901 "icmp_type": 0, 902 "icmp_code": 0, 903 "opts": "SSRR-trunc", 904 }, 905 { 906 "returncode": 0, 907 "stdout": """\ 908PING 192.0.2.2 (192.0.2.2): 56 data bytes 90964 bytes from: icmp_seq=0 ttl= time= ms 910SSRR: (truncated route) 911 912 913--- 192.0.2.2 ping statistics --- 9141 packets transmitted, 1 packets received, 0.0% packet loss 915round-trip min/avg/max/stddev = /// ms 916""", 917 "stderr": "", 918 "redacted": True, 919 }, 920 id="_0_0_opts_SSRR_trunc", 921 ), 922 pytest.param( 923 { 924 "src": "192.0.2.1", 925 "dst": "192.0.2.2", 926 "icmp_type": 0, 927 "icmp_code": 0, 928 "opts": "RR", 929 }, 930 { 931 "returncode": 0, 932 "stdout": """\ 933PING 192.0.2.2 (192.0.2.2): 56 data bytes 93464 bytes from: icmp_seq=0 ttl= time= ms 935RR: 192.0.2.10 936 192.0.2.20 937 192.0.2.30 938 192.0.2.40 939 192.0.2.50 940 192.0.2.60 941 192.0.2.70 942 192.0.2.80 943 192.0.2.90 944 945--- 192.0.2.2 ping statistics --- 9461 packets transmitted, 1 packets received, 0.0% packet loss 947round-trip min/avg/max/stddev = /// ms 948""", 949 "stderr": "", 950 "redacted": True, 951 }, 952 id="_0_0_opts_RR", 953 ), 954 pytest.param( 955 { 956 "src": "192.0.2.1", 957 "dst": "192.0.2.2", 958 "icmp_type": 0, 959 "icmp_code": 0, 960 "opts": "RR-same", 961 }, 962 { 963 "returncode": 0, 964 "stdout": """\ 965PING 192.0.2.2 (192.0.2.2): 56 data bytes 96664 bytes from: icmp_seq=0 ttl= time= ms (same route) 967 968--- 192.0.2.2 ping statistics --- 9691 packets transmitted, 1 packets received, 0.0% packet loss 970round-trip min/avg/max/stddev = /// ms 971""", 972 "stderr": "", 973 "redacted": True, 974 }, 975 id="_0_0_opts_RR_same", 976 ), 977 pytest.param( 978 { 979 "src": "192.0.2.1", 980 "dst": "192.0.2.2", 981 "icmp_type": 0, 982 "icmp_code": 0, 983 "opts": "RR-trunc", 984 }, 985 { 986 "returncode": 0, 987 "stdout": """\ 988PING 192.0.2.2 (192.0.2.2): 56 data bytes 98964 bytes from: icmp_seq=0 ttl= time= ms 990RR: (truncated route) 991 992--- 192.0.2.2 ping statistics --- 9931 packets transmitted, 1 packets received, 0.0% packet loss 994round-trip min/avg/max/stddev = /// ms 995""", 996 "stderr": "", 997 "redacted": True, 998 }, 999 id="_0_0_opts_RR_trunc", 1000 ), 1001 pytest.param( 1002 { 1003 "src": "192.0.2.1", 1004 "dst": "192.0.2.2", 1005 "icmp_type": 0, 1006 "icmp_code": 0, 1007 "opts": "NOP", 1008 }, 1009 { 1010 "returncode": 0, 1011 "stdout": """\ 1012PING 192.0.2.2 (192.0.2.2): 56 data bytes 101364 bytes from: icmp_seq=0 ttl= time= ms 1014wrong total length 88 instead of 84 1015NOP 1016 1017--- 192.0.2.2 ping statistics --- 10181 packets transmitted, 1 packets received, 0.0% packet loss 1019round-trip min/avg/max/stddev = /// ms 1020""", 1021 "stderr": "", 1022 "redacted": True, 1023 }, 1024 id="_0_0_opts_NOP", 1025 ), 1026 pytest.param( 1027 { 1028 "src": "192.0.2.1", 1029 "dst": "192.0.2.2", 1030 "icmp_type": 0, 1031 "icmp_code": 0, 1032 "opts": "NOP-40", 1033 }, 1034 { 1035 "returncode": 0, 1036 "stdout": """\ 1037PING 192.0.2.2 (192.0.2.2): 56 data bytes 103864 bytes from: icmp_seq=0 ttl= time= ms 1039wrong total length 124 instead of 84 1040NOP 1041NOP 1042NOP 1043NOP 1044NOP 1045NOP 1046NOP 1047NOP 1048NOP 1049NOP 1050NOP 1051NOP 1052NOP 1053NOP 1054NOP 1055NOP 1056NOP 1057NOP 1058NOP 1059NOP 1060NOP 1061NOP 1062NOP 1063NOP 1064NOP 1065NOP 1066NOP 1067NOP 1068NOP 1069NOP 1070NOP 1071NOP 1072NOP 1073NOP 1074NOP 1075NOP 1076NOP 1077NOP 1078NOP 1079NOP 1080 1081--- 192.0.2.2 ping statistics --- 10821 packets transmitted, 1 packets received, 0.0% packet loss 1083round-trip min/avg/max/stddev = /// ms 1084""", 1085 "stderr": "", 1086 "redacted": True, 1087 }, 1088 id="_0_0_opts_NOP_40", 1089 ), 1090 pytest.param( 1091 { 1092 "src": "192.0.2.1", 1093 "dst": "192.0.2.2", 1094 "icmp_type": 0, 1095 "icmp_code": 0, 1096 "opts": "unk", 1097 }, 1098 { 1099 "returncode": 0, 1100 "stdout": """\ 1101PING 192.0.2.2 (192.0.2.2): 56 data bytes 110264 bytes from: icmp_seq=0 ttl= time= ms 1103wrong total length 88 instead of 84 1104unknown option 9f 1105 1106--- 192.0.2.2 ping statistics --- 11071 packets transmitted, 1 packets received, 0.0% packet loss 1108round-trip min/avg/max/stddev = /// ms 1109""", 1110 "stderr": "", 1111 "redacted": True, 1112 }, 1113 id="_0_0_opts_unk", 1114 ), 1115 pytest.param( 1116 { 1117 "src": "192.0.2.1", 1118 "dst": "192.0.2.2", 1119 "icmp_type": 3, 1120 "icmp_code": 1, 1121 "opts": "NOP-40", 1122 }, 1123 { 1124 "returncode": 2, 1125 "stdout": """\ 1126PING 192.0.2.2 (192.0.2.2): 56 data bytes 1127132 bytes from 192.0.2.2: Destination Host Unreachable 1128Vr HL TOS Len ID Flg off TTL Pro cks Src Dst 1129 4 f 00 007c 0001 0 0000 40 01 d868 192.0.2.1 192.0.2.2 01010101010101010101010101010101010101010101010101010101010101010101010101010101 1130 1131 1132--- 192.0.2.2 ping statistics --- 11331 packets transmitted, 0 packets received, 100.0% packet loss 1134""", 1135 "stderr": "", 1136 "redacted": False, 1137 }, 1138 id="_3_1_opts_NOP_40", 1139 ), 1140 pytest.param( 1141 { 1142 "src": "192.0.2.1", 1143 "dst": "192.0.2.2", 1144 "icmp_type": 3, 1145 "icmp_code": 1, 1146 "flags": "DF", 1147 }, 1148 { 1149 "returncode": 2, 1150 "stdout": """\ 1151PING 192.0.2.2 (192.0.2.2): 56 data bytes 115292 bytes from 192.0.2.2: Destination Host Unreachable 1153Vr HL TOS Len ID Flg off TTL Pro cks Src Dst 1154 4 5 00 0054 0001 2 0000 40 01 b6a4 192.0.2.1 192.0.2.2 1155 1156 1157--- 192.0.2.2 ping statistics --- 11581 packets transmitted, 0 packets received, 100.0% packet loss 1159""", 1160 "stderr": "", 1161 "redacted": False, 1162 }, 1163 id="_3_1_flags_DF", 1164 ), 1165 pytest.param( 1166 { 1167 "src": "192.0.2.1", 1168 "dst": "192.0.2.2", 1169 "icmp_type": 3, 1170 "icmp_code": 1, 1171 "special": "tcp", 1172 }, 1173 { 1174 "returncode": 2, 1175 "stdout": """\ 1176PATTERN: 0x01 1177PING 192.0.2.2 (192.0.2.2): 56 data bytes 1178 1179--- 192.0.2.2 ping statistics --- 11801 packets transmitted, 0 packets received, 100.0% packet loss 1181""", 1182 "stderr": """\ 1183ping: quoted data too short (40 bytes) from 192.0.2.2 1184""", 1185 "redacted": False, 1186 }, 1187 id="_3_1_special_tcp", 1188 ), 1189 pytest.param( 1190 { 1191 "src": "192.0.2.1", 1192 "dst": "192.0.2.2", 1193 "icmp_type": 3, 1194 "icmp_code": 1, 1195 "special": "udp", 1196 }, 1197 { 1198 "returncode": 2, 1199 "stdout": """\ 1200PATTERN: 0x01 1201PING 192.0.2.2 (192.0.2.2): 56 data bytes 1202 1203--- 192.0.2.2 ping statistics --- 12041 packets transmitted, 0 packets received, 100.0% packet loss 1205""", 1206 "stderr": """\ 1207ping: quoted data too short (28 bytes) from 192.0.2.2 1208""", 1209 "redacted": False, 1210 }, 1211 id="_3_1_special_udp", 1212 ), 1213 pytest.param( 1214 { 1215 "src": "192.0.2.1", 1216 "dst": "192.0.2.2", 1217 "icmp_type": 0, 1218 "icmp_code": 0, 1219 "special": "warp", 1220 }, 1221 { 1222 "returncode": 0, 1223 "stdout": """\ 1224PATTERN: 0x01 1225PING 192.0.2.2 (192.0.2.2): 56 data bytes 122664 bytes from: icmp_seq=0 ttl= time= ms 1227 1228--- 192.0.2.2 ping statistics --- 12291 packets transmitted, 1 packets received, 0.0% packet loss 1230round-trip min/avg/max/stddev = /// ms 1231""", 1232 "stderr": """\ 1233ping: time of day goes back (- ms), clamping time to 0 1234""", 1235 "redacted": True, 1236 }, 1237 id="_0_0_special_warp", 1238 ), 1239 ] 1240 1241 @pytest.mark.parametrize("pinger_kargs, expected", pinger_testdata) 1242 @pytest.mark.require_progs(["scapy"]) 1243 @pytest.mark.require_user("root") 1244 def test_pinger(self, pinger_kargs, expected): 1245 """Test ping using pinger(), a reply faker""" 1246 iface = IfaceFactory().create_iface("", "tun")[0].name 1247 ping = pinger(iface, **pinger_kargs) 1248 assert ping.returncode == expected["returncode"] 1249 if expected["redacted"]: 1250 assert redact(ping.stdout) == expected["stdout"] 1251 assert redact(ping.stderr) == expected["stderr"] 1252 else: 1253 assert ping.stdout == expected["stdout"] 1254 assert ping.stderr == expected["stderr"] 1255