1import errno 2import json 3import os 4import socket 5import struct 6import subprocess 7import sys 8from ctypes import c_byte 9from ctypes import c_char 10from ctypes import c_int 11from ctypes import c_long 12from ctypes import c_uint32 13from ctypes import c_uint8 14from ctypes import c_ulong 15from ctypes import c_ushort 16from ctypes import sizeof 17from ctypes import Structure 18from enum import Enum 19from typing import Any 20from typing import Dict 21from typing import List 22from typing import NamedTuple 23from typing import Optional 24from typing import Union 25 26import pytest 27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode 28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode 29from atf_python.sys.netpfil.ipfw.insns import Insn 30from atf_python.sys.netpfil.ipfw.insns import InsnComment 31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty 32from atf_python.sys.netpfil.ipfw.insns import InsnIp 33from atf_python.sys.netpfil.ipfw.insns import InsnIp6 34from atf_python.sys.netpfil.ipfw.insns import InsnPorts 35from atf_python.sys.netpfil.ipfw.insns import InsnProb 36from atf_python.sys.netpfil.ipfw.insns import InsnProto 37from atf_python.sys.netpfil.ipfw.insns import InsnReject 38from atf_python.sys.netpfil.ipfw.insns import InsnTable 39from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode 40from atf_python.sys.netpfil.ipfw.ioctl import CTlv 41from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule 42from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType 43from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule 44from atf_python.sys.netpfil.ipfw.ioctl import NTlv 45from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType 46from atf_python.sys.netpfil.ipfw.ioctl import RawRule 47from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader 48from atf_python.sys.netpfil.ipfw.utils import enum_from_int 49from atf_python.utils import BaseTest 50 51 52IPFW_PATH = "/sbin/ipfw" 53 54 55def differ(w_obj, g_obj, w_stack=[], g_stack=[]): 56 if bytes(w_obj) == bytes(g_obj): 57 return True 58 num_objects = 0 59 for i, w_child in enumerate(w_obj.obj_list): 60 if i >= len(g_obj.obj_list): 61 print("MISSING object from chain {}".format(" / ".join(w_stack))) 62 w_child.print_obj() 63 print("==========================") 64 return False 65 g_child = g_obj.obj_list[i] 66 if bytes(w_child) == bytes(g_child): 67 num_objects += 1 68 continue 69 w_stack.append(w_obj.obj_name) 70 g_stack.append(g_obj.obj_name) 71 if not differ(w_child, g_child, w_stack, g_stack): 72 return False 73 break 74 if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list): 75 g_child = g_obj.obj_list[num_objects] 76 print("EXTRA object from chain {}".format(" / ".join(g_stack))) 77 g_child.print_obj() 78 print("==========================") 79 return False 80 print("OBJECTS DIFFER") 81 print("WANTED CHAIN: {}".format(" / ".join(w_stack))) 82 w_obj.print_obj() 83 w_obj.print_obj_hex() 84 print("==========================") 85 print("GOT CHAIN: {}".format(" / ".join(g_stack))) 86 g_obj.print_obj() 87 g_obj.print_obj_hex() 88 print("==========================") 89 return False 90 91 92class TestAddRule(BaseTest): 93 def compile_rule(self, out): 94 tlvs = [] 95 if "objs" in out: 96 tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"])) 97 rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"]) 98 tlvs.append(CTlvRule(obj_list=[rule])) 99 return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs) 100 101 def verify_rule(self, in_data: str, out_data): 102 # Prepare the desired output 103 expected = self.compile_rule(out_data) 104 105 reader = DebugIoReader(IPFW_PATH) 106 ioctls = reader.get_records(in_data) 107 assert len(ioctls) == 1 # Only 1 ioctl request expected 108 got = ioctls[0] 109 110 if not differ(expected, got): 111 print("=> CMD: {}".format(in_data)) 112 print("=> WANTED:") 113 expected.print_obj() 114 print("==========================") 115 print("=> GOT:") 116 got.print_obj() 117 print("==========================") 118 assert bytes(got) == bytes(expected) 119 120 @pytest.mark.parametrize( 121 "rule", 122 [ 123 pytest.param( 124 { 125 "in": "add 200 allow ip from any to any", 126 "out": { 127 "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)], 128 "rulenum": 200, 129 }, 130 }, 131 id="test_rulenum", 132 ), 133 pytest.param( 134 { 135 "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any", 136 "out": { 137 "insns": [ 138 InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True), 139 InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"), 140 InsnEmpty(IpFwOpcode.O_ACCEPT), 141 ], 142 }, 143 }, 144 id="test_or", 145 ), 146 pytest.param( 147 { 148 "in": "add allow ip from table(AAA) to table(BBB)", 149 "out": { 150 "objs": [ 151 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 152 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 153 ], 154 "insns": [ 155 InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1), 156 InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2), 157 InsnEmpty(IpFwOpcode.O_ACCEPT), 158 ], 159 }, 160 }, 161 id="test_tables", 162 ), 163 pytest.param( 164 { 165 "in": "add allow ip from any to 1.2.3.4 // test comment", 166 "out": { 167 "insns": [ 168 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 169 InsnComment(comment="test comment"), 170 InsnEmpty(IpFwOpcode.O_ACCEPT), 171 ], 172 }, 173 }, 174 id="test_comment", 175 ), 176 pytest.param( 177 { 178 "in": "add tcp-setmss 123 ip from any to 1.2.3.4", 179 "out": { 180 "objs": [ 181 NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"), 182 ], 183 "insns": [ 184 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 185 Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1), 186 Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123), 187 ], 188 }, 189 }, 190 id="test_eaction_tcp-setmss", 191 ), 192 pytest.param( 193 { 194 "in": "add eaction ntpv6 AAA ip from any to 1.2.3.4", 195 "out": { 196 "objs": [ 197 NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="ntpv6"), 198 NTlv(0, idx=2, name="AAA"), 199 ], 200 "insns": [ 201 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 202 Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1), 203 Insn(IpFwOpcode.O_EXTERNAL_INSTANCE, arg1=2), 204 ], 205 }, 206 }, 207 id="test_eaction_ntp", 208 ), 209 pytest.param( 210 { 211 "in": "add // test comment", 212 "out": { 213 "insns": [ 214 InsnComment(comment="test comment"), 215 Insn(IpFwOpcode.O_COUNT), 216 ], 217 }, 218 }, 219 id="test_action_comment", 220 ), 221 pytest.param( 222 { 223 "in": "add check-state :OUT // test comment", 224 "out": { 225 "objs": [ 226 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), 227 ], 228 "insns": [ 229 InsnComment(comment="test comment"), 230 Insn(IpFwOpcode.O_CHECK_STATE, arg1=1), 231 ], 232 }, 233 }, 234 id="test_check_state", 235 ), 236 pytest.param( 237 { 238 "in": "add allow tcp from any to any keep-state :OUT", 239 "out": { 240 "objs": [ 241 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), 242 ], 243 "insns": [ 244 Insn(IpFwOpcode.O_PROBE_STATE, arg1=1), 245 Insn(IpFwOpcode.O_PROTO, arg1=6), 246 Insn(IpFwOpcode.O_KEEP_STATE, arg1=1), 247 InsnEmpty(IpFwOpcode.O_ACCEPT), 248 ], 249 }, 250 }, 251 id="test_keep_state", 252 ), 253 pytest.param( 254 { 255 "in": "add allow tcp from any to any record-state", 256 "out": { 257 "objs": [ 258 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"), 259 ], 260 "insns": [ 261 Insn(IpFwOpcode.O_PROTO, arg1=6), 262 Insn(IpFwOpcode.O_KEEP_STATE, arg1=1), 263 InsnEmpty(IpFwOpcode.O_ACCEPT), 264 ], 265 }, 266 }, 267 id="test_record_state", 268 ), 269 ], 270 ) 271 def test_add_rule(self, rule): 272 """Tests if the compiled rule is sane and matches the spec""" 273 self.verify_rule(rule["in"], rule["out"]) 274 275 @pytest.mark.parametrize( 276 "action", 277 [ 278 pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"), 279 pytest.param( 280 ( 281 "abort", 282 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT), 283 ), 284 id="abort", 285 ), 286 pytest.param( 287 ( 288 "abort6", 289 Insn( 290 IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT 291 ), 292 ), 293 id="abort6", 294 ), 295 pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"), 296 pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"), 297 pytest.param( 298 ( 299 "reject", 300 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST), 301 ), 302 id="reject", 303 ), 304 pytest.param( 305 ( 306 "reset", 307 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST), 308 ), 309 id="reset", 310 ), 311 pytest.param( 312 ( 313 "reset6", 314 Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST), 315 ), 316 id="reset6", 317 ), 318 pytest.param( 319 ( 320 "unreach port", 321 InsnReject( 322 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 323 ), 324 ), 325 id="unreach_port", 326 ), 327 pytest.param( 328 ( 329 "unreach port", 330 InsnReject( 331 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 332 ), 333 ), 334 id="unreach_port", 335 ), 336 pytest.param( 337 ( 338 "unreach needfrag", 339 InsnReject( 340 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG 341 ), 342 ), 343 id="unreach_needfrag", 344 ), 345 pytest.param( 346 ( 347 "unreach needfrag 1420", 348 InsnReject( 349 IpFwOpcode.O_REJECT, 350 arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG, 351 mtu=1420, 352 ), 353 ), 354 id="unreach_needfrag_mtu", 355 ), 356 pytest.param( 357 ( 358 "unreach6 port", 359 Insn( 360 IpFwOpcode.O_UNREACH6, 361 arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT, 362 ), 363 ), 364 id="unreach6_port", 365 ), 366 pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"), 367 # TOK_NAT 368 pytest.param( 369 ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42" 370 ), 371 pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"), 372 pytest.param( 373 ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42" 374 ), 375 pytest.param( 376 ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42" 377 ), 378 pytest.param( 379 ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42" 380 ), 381 pytest.param( 382 ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42" 383 ), 384 pytest.param( 385 ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd" 386 ), 387 pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"), 388 pytest.param( 389 ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420" 390 ), 391 # TOK_FORWARD 392 pytest.param( 393 ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)), 394 id="setfib_1", 395 marks=pytest.mark.skip("needs net.fibs>1"), 396 ), 397 pytest.param( 398 ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)), 399 id="setdscp_42", 400 ), 401 pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"), 402 pytest.param( 403 ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return" 404 ), 405 ], 406 ) 407 def test_add_action(self, action): 408 """Tests if the rule action is compiled properly""" 409 rule_in = "add {} ip from any to any".format(action[0]) 410 rule_out = {"insns": [action[1]]} 411 self.verify_rule(rule_in, rule_out) 412 413 @pytest.mark.parametrize( 414 "insn", 415 [ 416 pytest.param( 417 { 418 "in": "add prob 0.7 allow ip from any to any", 419 "out": InsnProb(prob=0.7), 420 }, 421 id="test_prob", 422 ), 423 pytest.param( 424 { 425 "in": "add allow tcp from any to any", 426 "out": InsnProto(arg1=6), 427 }, 428 id="test_proto", 429 ), 430 pytest.param( 431 { 432 "in": "add allow ip from any to any 57", 433 "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]), 434 }, 435 id="test_ports", 436 ), 437 ], 438 ) 439 def test_add_single_instruction(self, insn): 440 """Tests if the compiled rule is sane and matches the spec""" 441 442 # Prepare the desired output 443 out = { 444 "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)], 445 } 446 self.verify_rule(insn["in"], out) 447 448 @pytest.mark.parametrize( 449 "opcode", 450 [ 451 pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"), 452 pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"), 453 ], 454 ) 455 @pytest.mark.parametrize( 456 "params", 457 [ 458 pytest.param( 459 { 460 "in": "57", 461 "out": [(57, 57)], 462 }, 463 id="test_single", 464 ), 465 pytest.param( 466 { 467 "in": "57-59", 468 "out": [(57, 59)], 469 }, 470 id="test_range", 471 ), 472 pytest.param( 473 { 474 "in": "57-59,41", 475 "out": [(57, 59), (41, 41)], 476 }, 477 id="test_ranges", 478 ), 479 ], 480 ) 481 def test_add_ports(self, params, opcode): 482 if opcode == IpFwOpcode.O_IP_DSTPORT: 483 txt = "add allow ip from any to any " + params["in"] 484 else: 485 txt = "add allow ip from any " + params["in"] + " to any" 486 out = { 487 "insns": [ 488 InsnPorts(opcode, port_pairs=params["out"]), 489 InsnEmpty(IpFwOpcode.O_ACCEPT), 490 ] 491 } 492 self.verify_rule(txt, out) 493