1import errno 2import json 3import os 4import socket 5import struct 6import subprocess 7import sys 8from ctypes import c_byte 9from ctypes import c_char 10from ctypes import c_int 11from ctypes import c_long 12from ctypes import c_uint32 13from ctypes import c_uint8 14from ctypes import c_ulong 15from ctypes import c_ushort 16from ctypes import sizeof 17from ctypes import Structure 18from enum import Enum 19from typing import Any 20from typing import Dict 21from typing import List 22from typing import NamedTuple 23from typing import Optional 24from typing import Union 25 26import pytest 27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode 28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode 29from atf_python.sys.netpfil.ipfw.insns import Insn 30from atf_python.sys.netpfil.ipfw.insns import InsnComment 31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty 32from atf_python.sys.netpfil.ipfw.insns import InsnIp 33from atf_python.sys.netpfil.ipfw.insns import InsnIp6 34from atf_python.sys.netpfil.ipfw.insns import InsnPorts 35from atf_python.sys.netpfil.ipfw.insns import InsnProb 36from atf_python.sys.netpfil.ipfw.insns import InsnProto 37from atf_python.sys.netpfil.ipfw.insns import InsnReject 38from atf_python.sys.netpfil.ipfw.insns import InsnTable 39from atf_python.sys.netpfil.ipfw.insns import InsnU32 40from atf_python.sys.netpfil.ipfw.insns import InsnKidx 41from atf_python.sys.netpfil.ipfw.insns import InsnLookup 42from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode 43from atf_python.sys.netpfil.ipfw.insn_headers import IpFwTableLookupType 44from atf_python.sys.netpfil.ipfw.insn_headers import IpFwTableValueType 45from atf_python.sys.netpfil.ipfw.ioctl import CTlv 46from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule 47from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType 48from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule 49from atf_python.sys.netpfil.ipfw.ioctl import NTlv 50from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType 51from atf_python.sys.netpfil.ipfw.ioctl import RawRule 52from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader 53from atf_python.sys.netpfil.ipfw.utils import enum_from_int 54from atf_python.utils import BaseTest 55 56 57IPFW_PATH = "/sbin/ipfw" 58 59 60def differ(w_obj, g_obj, w_stack=[], g_stack=[]): 61 if bytes(w_obj) == bytes(g_obj): 62 return True 63 num_objects = 0 64 for i, w_child in enumerate(w_obj.obj_list): 65 if i >= len(g_obj.obj_list): 66 print("MISSING object from chain {}".format(" / ".join(w_stack))) 67 w_child.print_obj() 68 print("==========================") 69 return False 70 g_child = g_obj.obj_list[i] 71 if bytes(w_child) == bytes(g_child): 72 num_objects += 1 73 continue 74 w_stack.append(w_obj.obj_name) 75 g_stack.append(g_obj.obj_name) 76 if not differ(w_child, g_child, w_stack, g_stack): 77 return False 78 break 79 if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list): 80 g_child = g_obj.obj_list[num_objects] 81 print("EXTRA object from chain {}".format(" / ".join(g_stack))) 82 g_child.print_obj() 83 print("==========================") 84 return False 85 print("OBJECTS DIFFER") 86 print("WANTED CHAIN: {}".format(" / ".join(w_stack))) 87 w_obj.print_obj() 88 w_obj.print_obj_hex() 89 print("==========================") 90 print("GOT CHAIN: {}".format(" / ".join(g_stack))) 91 g_obj.print_obj() 92 g_obj.print_obj_hex() 93 print("==========================") 94 return False 95 96 97class TestAddRule(BaseTest): 98 def compile_rule(self, out): 99 tlvs = [] 100 if "objs" in out: 101 tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"])) 102 rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"]) 103 tlvs.append(CTlvRule(obj_list=[rule])) 104 return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs) 105 106 def verify_rule(self, in_data: str, out_data): 107 # Prepare the desired output 108 expected = self.compile_rule(out_data) 109 110 reader = DebugIoReader(IPFW_PATH) 111 ioctls = reader.get_records(in_data) 112 assert len(ioctls) == 1 # Only 1 ioctl request expected 113 got = ioctls[0] 114 115 if not differ(expected, got): 116 print("=> CMD: {}".format(in_data)) 117 print("=> WANTED:") 118 expected.print_obj() 119 print("==========================") 120 print("=> GOT:") 121 got.print_obj() 122 print("==========================") 123 assert bytes(got) == bytes(expected) 124 125 @pytest.mark.parametrize( 126 "rule", 127 [ 128 pytest.param( 129 { 130 "in": "add 200 allow ip from any to any", 131 "out": { 132 "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)], 133 "rulenum": 200, 134 }, 135 }, 136 id="test_rulenum", 137 ), 138 pytest.param( 139 { 140 "in": "add allow ip4 from 0.0.0.0/0 to 192.0.2.1/0", 141 "out": { 142 "insns": [ 143 InsnEmpty(IpFwOpcode.O_IP4), 144 InsnEmpty(IpFwOpcode.O_ACCEPT), 145 ], 146 }, 147 }, 148 id="test_zero_addrmask4", 149 ), 150 pytest.param( 151 { 152 "in": "add allow ip6 from ::/0 to 2001:DB8::/0", 153 "out": { 154 "insns": [ 155 InsnEmpty(IpFwOpcode.O_IP6), 156 InsnEmpty(IpFwOpcode.O_ACCEPT), 157 ], 158 }, 159 }, 160 id="test_zero_addrmask6", 161 ), 162 pytest.param( 163 { 164 "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any", 165 "out": { 166 "insns": [ 167 InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True), 168 InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"), 169 InsnEmpty(IpFwOpcode.O_ACCEPT), 170 ], 171 }, 172 }, 173 id="test_or", 174 ), 175 pytest.param( 176 { 177 "in": "add allow ip from table(AAA) to table(BBB)", 178 "out": { 179 "objs": [ 180 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 181 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 182 ], 183 "insns": [ 184 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 185 InsnKidx(IpFwOpcode.O_IP_DST_LOOKUP, kidx=2), 186 InsnEmpty(IpFwOpcode.O_ACCEPT), 187 ], 188 }, 189 }, 190 id="test_tables", 191 ), 192 pytest.param( 193 { 194 "in": "add allow ip from table(AAA) to any lookup dst-ip BBB", 195 "out": { 196 "objs": [ 197 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 198 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 199 ], 200 "insns": [ 201 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 202 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 203 kidx=2, 204 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP), 205 ), 206 InsnEmpty(IpFwOpcode.O_ACCEPT), 207 ], 208 }, 209 }, 210 id="test_tables_lookup_no_mask", 211 ), 212 pytest.param( 213 { 214 "in": "add allow ip from table(AAA) to any lookup mark:0xf00baa1 BBB", 215 "out": { 216 "objs": [ 217 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 218 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 219 ], 220 "insns": [ 221 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 222 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 223 kidx=2, 224 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_MARK, bitmask=True), 225 bitmask=0xf00baa1), 226 InsnEmpty(IpFwOpcode.O_ACCEPT), 227 ], 228 }, 229 }, 230 id="test_tables_lookup_u32_mask", 231 ), 232 pytest.param( 233 { 234 "in": "add allow ip from table(AAA) to any lookup src-mac:1a:2b:3c:4d:5e:6f BBB", 235 "out": { 236 "objs": [ 237 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 238 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 239 ], 240 "insns": [ 241 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 242 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 243 kidx=2, 244 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_SRC_MAC, bitmask=True), 245 bitmask="1a:2b:3c:4d:5e:6f"), 246 InsnEmpty(IpFwOpcode.O_ACCEPT), 247 ], 248 }, 249 }, 250 id="test_tables_lookup_mac_mask", 251 ), 252 pytest.param( 253 { 254 "in": "add allow ip from table(AAA) to any lookup dst-ip4:1715004 BBB", 255 "out": { 256 "objs": [ 257 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 258 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 259 ], 260 "insns": [ 261 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 262 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 263 kidx=2, 264 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP4, bitmask=True), 265 bitmask="60.43.26.0"), 266 InsnEmpty(IpFwOpcode.O_ACCEPT), 267 ], 268 }, 269 }, 270 id="test_tables_lookup_dst_ip4_numeric", 271 ), 272 pytest.param( 273 { 274 "in": "add allow ip from table(AAA) to any lookup src-ip4:0.0.0.255 BBB", 275 "out": { 276 "objs": [ 277 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 278 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 279 ], 280 "insns": [ 281 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 282 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 283 kidx=2, 284 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_SRC_IP4, bitmask=True), 285 bitmask="0.0.0.255"), 286 InsnEmpty(IpFwOpcode.O_ACCEPT), 287 ], 288 }, 289 }, 290 id="test_tables_lookup_src_ip4_addr", 291 ), 292 pytest.param( 293 { 294 "in": "add allow ip from table(AAA) to any lookup jail:0.0.252.128 BBB", 295 "out": { 296 "objs": [ 297 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 298 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 299 ], 300 "insns": [ 301 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 302 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 303 kidx=2, 304 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_JAIL, bitmask=True), 305 bitmask=64640), 306 InsnEmpty(IpFwOpcode.O_ACCEPT), 307 ], 308 }, 309 }, 310 id="test_tables_lookup_jail_addr", 311 ), 312 pytest.param( 313 { 314 "in": "add allow ip from table(AAA) to any lookup dst-ip6:ffff:ffff:f00:baaa:b00c:: BBB", 315 "out": { 316 "objs": [ 317 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 318 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 319 ], 320 "insns": [ 321 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 322 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 323 kidx=2, 324 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP6, bitmask=True), 325 bitmask="ffff:ffff:f00:baaa:b00c::"), 326 InsnEmpty(IpFwOpcode.O_ACCEPT), 327 ], 328 }, 329 }, 330 id="test_tables_lookup_dst_ip6", 331 ), 332 pytest.param( 333 { 334 "in": "add allow ip from table(AAA,16777215) to any", 335 "out": { 336 "objs": [ 337 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 338 ], 339 "insns": [ 340 InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP, 341 kidx=1, 342 arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_TAG), 343 value=16777215), 344 InsnEmpty(IpFwOpcode.O_ACCEPT), 345 ], 346 }, 347 }, 348 id="test_tables_check_value_legacy", 349 ), 350 pytest.param( 351 { 352 "in": "add allow ip from table(AAA,nat=1231) to any", 353 "out": { 354 "objs": [ 355 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 356 ], 357 "insns": [ 358 InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP, 359 kidx=1, 360 arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NAT), 361 value=1231), 362 InsnEmpty(IpFwOpcode.O_ACCEPT), 363 ], 364 }, 365 }, 366 id="test_tables_check_value_nat", 367 ), 368 pytest.param( 369 { 370 "in": "add allow ip from table(AAA,nh4=10.20.30.40) to any", 371 "out": { 372 "objs": [ 373 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 374 ], 375 "insns": [ 376 InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP, 377 kidx=1, 378 arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NH4), 379 value="10.20.30.40"), 380 InsnEmpty(IpFwOpcode.O_ACCEPT), 381 ], 382 }, 383 }, 384 id="test_tables_check_value_nh4", 385 ), 386 pytest.param( 387 { 388 "in": "add allow ip from table(AAA,nh6=ff02:1234:b00c::abcd) to any", 389 "out": { 390 "objs": [ 391 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 392 ], 393 "insns": [ 394 InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP, 395 kidx=1, 396 arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NH6), 397 value="ff02:1234:b00c::abcd"), 398 InsnEmpty(IpFwOpcode.O_ACCEPT), 399 ], 400 }, 401 }, 402 id="test_tables_check_value_nh6", 403 ), 404 pytest.param( 405 { 406 "in": "add allow ip from any to 1.2.3.4 // test comment", 407 "out": { 408 "insns": [ 409 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 410 InsnComment(comment="test comment"), 411 InsnEmpty(IpFwOpcode.O_ACCEPT), 412 ], 413 }, 414 }, 415 id="test_comment", 416 ), 417 pytest.param( 418 { 419 "in": "add tcp-setmss 123 ip from any to 1.2.3.4", 420 "out": { 421 "objs": [ 422 NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"), 423 ], 424 "insns": [ 425 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 426 InsnKidx(IpFwOpcode.O_EXTERNAL_ACTION, kidx=1), 427 Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123), 428 ], 429 }, 430 }, 431 id="test_eaction_tcp-setmss", 432 ), 433 pytest.param( 434 { 435 "in": "add eaction nptv6 AAA ip from any to 1.2.3.4", 436 "out": { 437 "objs": [ 438 NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="nptv6"), 439 NTlv(0, idx=2, name="AAA"), 440 ], 441 "insns": [ 442 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 443 InsnKidx(IpFwOpcode.O_EXTERNAL_ACTION, kidx=1), 444 InsnKidx(IpFwOpcode.O_EXTERNAL_INSTANCE, kidx=2), 445 ], 446 }, 447 }, 448 id="test_eaction_nptv6", 449 ), 450 pytest.param( 451 { 452 "in": "add // test comment", 453 "out": { 454 "insns": [ 455 InsnComment(comment="test comment"), 456 Insn(IpFwOpcode.O_COUNT), 457 ], 458 }, 459 }, 460 id="test_action_comment", 461 ), 462 pytest.param( 463 { 464 "in": "add check-state :OUT // test comment", 465 "out": { 466 "objs": [ 467 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), 468 ], 469 "insns": [ 470 InsnComment(comment="test comment"), 471 InsnKidx(IpFwOpcode.O_CHECK_STATE, kidx=1), 472 ], 473 }, 474 }, 475 id="test_check_state", 476 ), 477 pytest.param( 478 { 479 "in": "add allow tcp from any to any keep-state :OUT", 480 "out": { 481 "objs": [ 482 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), 483 ], 484 "insns": [ 485 InsnKidx(IpFwOpcode.O_PROBE_STATE, kidx=1), 486 Insn(IpFwOpcode.O_PROTO, arg1=6), 487 InsnKidx(IpFwOpcode.O_KEEP_STATE, kidx=1), 488 InsnEmpty(IpFwOpcode.O_ACCEPT), 489 ], 490 }, 491 }, 492 id="test_keep_state", 493 ), 494 pytest.param( 495 { 496 "in": "add allow tcp from any to any record-state", 497 "out": { 498 "objs": [ 499 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"), 500 ], 501 "insns": [ 502 Insn(IpFwOpcode.O_PROTO, arg1=6), 503 InsnKidx(IpFwOpcode.O_KEEP_STATE, kidx=1), 504 InsnEmpty(IpFwOpcode.O_ACCEPT), 505 ], 506 }, 507 }, 508 id="test_record_state", 509 ), 510 ], 511 ) 512 def test_add_rule(self, rule): 513 """Tests if the compiled rule is sane and matches the spec""" 514 self.verify_rule(rule["in"], rule["out"]) 515 516 @pytest.mark.parametrize( 517 "action", 518 [ 519 pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"), 520 pytest.param( 521 ( 522 "abort", 523 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT), 524 ), 525 id="abort", 526 ), 527 pytest.param( 528 ( 529 "abort6", 530 Insn( 531 IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT 532 ), 533 ), 534 id="abort6", 535 ), 536 pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"), 537 pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"), 538 pytest.param( 539 ( 540 "reject", 541 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST), 542 ), 543 id="reject", 544 ), 545 pytest.param( 546 ( 547 "reset", 548 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST), 549 ), 550 id="reset", 551 ), 552 pytest.param( 553 ( 554 "reset6", 555 Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST), 556 ), 557 id="reset6", 558 ), 559 pytest.param( 560 ( 561 "unreach port", 562 InsnReject( 563 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 564 ), 565 ), 566 id="unreach_port", 567 ), 568 pytest.param( 569 ( 570 "unreach port", 571 InsnReject( 572 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 573 ), 574 ), 575 id="unreach_port", 576 ), 577 pytest.param( 578 ( 579 "unreach needfrag", 580 InsnReject( 581 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG 582 ), 583 ), 584 id="unreach_needfrag", 585 ), 586 pytest.param( 587 ( 588 "unreach needfrag 1420", 589 InsnReject( 590 IpFwOpcode.O_REJECT, 591 arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG, 592 mtu=1420, 593 ), 594 ), 595 id="unreach_needfrag_mtu", 596 ), 597 pytest.param( 598 ( 599 "unreach6 port", 600 Insn( 601 IpFwOpcode.O_UNREACH6, 602 arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT, 603 ), 604 ), 605 id="unreach6_port", 606 ), 607 pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"), 608 # TOK_NAT 609 pytest.param( 610 ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42" 611 ), 612 pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"), 613 pytest.param( 614 ("skipto 42", InsnU32(IpFwOpcode.O_SKIPTO, u32=42)), id="skipto_42" 615 ), 616 pytest.param( 617 ("skipto 4200", InsnU32(IpFwOpcode.O_SKIPTO, u32=4200)), id="skipto_4200" 618 ), 619 pytest.param( 620 ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42" 621 ), 622 pytest.param( 623 ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42" 624 ), 625 pytest.param( 626 ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42" 627 ), 628 pytest.param( 629 ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd" 630 ), 631 pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"), 632 pytest.param( 633 ("call 420", InsnU32(IpFwOpcode.O_CALLRETURN, u32=420)), id="call_420" 634 ), 635 # TOK_FORWARD 636 pytest.param( 637 ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)), 638 id="setfib_1", 639 marks=pytest.mark.skip("needs net.fibs>1"), 640 ), 641 pytest.param( 642 ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)), 643 id="setdscp_42", 644 ), 645 pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"), 646 pytest.param( 647 ("return", InsnU32(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return" 648 ), 649 ], 650 ) 651 def test_add_action(self, action): 652 """Tests if the rule action is compiled properly""" 653 rule_in = "add {} ip from any to any".format(action[0]) 654 rule_out = {"insns": [action[1]]} 655 self.verify_rule(rule_in, rule_out) 656 657 @pytest.mark.parametrize( 658 "insn", 659 [ 660 pytest.param( 661 { 662 "in": "add prob 0.7 allow ip from any to any", 663 "out": InsnProb(prob=0.7), 664 }, 665 id="test_prob", 666 ), 667 pytest.param( 668 { 669 "in": "add allow tcp from any to any", 670 "out": InsnProto(arg1=6), 671 }, 672 id="test_proto", 673 ), 674 pytest.param( 675 { 676 "in": "add allow ip from any to any 57", 677 "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]), 678 }, 679 id="test_ports", 680 ), 681 ], 682 ) 683 def test_add_single_instruction(self, insn): 684 """Tests if the compiled rule is sane and matches the spec""" 685 686 # Prepare the desired output 687 out = { 688 "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)], 689 } 690 self.verify_rule(insn["in"], out) 691 692 @pytest.mark.parametrize( 693 "opcode", 694 [ 695 pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"), 696 pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"), 697 ], 698 ) 699 @pytest.mark.parametrize( 700 "params", 701 [ 702 pytest.param( 703 { 704 "in": "57", 705 "out": [(57, 57)], 706 }, 707 id="test_single", 708 ), 709 pytest.param( 710 { 711 "in": "57-59", 712 "out": [(57, 59)], 713 }, 714 id="test_range", 715 ), 716 pytest.param( 717 { 718 "in": "57-59,41", 719 "out": [(57, 59), (41, 41)], 720 }, 721 id="test_ranges", 722 ), 723 ], 724 ) 725 def test_add_ports(self, params, opcode): 726 if opcode == IpFwOpcode.O_IP_DSTPORT: 727 txt = "add allow ip from any to any " + params["in"] 728 else: 729 txt = "add allow ip from any " + params["in"] + " to any" 730 out = { 731 "insns": [ 732 InsnPorts(opcode, port_pairs=params["out"]), 733 InsnEmpty(IpFwOpcode.O_ACCEPT), 734 ] 735 } 736 self.verify_rule(txt, out) 737