1import errno 2import json 3import os 4import socket 5import struct 6import subprocess 7import sys 8from ctypes import c_byte 9from ctypes import c_char 10from ctypes import c_int 11from ctypes import c_long 12from ctypes import c_uint32 13from ctypes import c_uint8 14from ctypes import c_ulong 15from ctypes import c_ushort 16from ctypes import sizeof 17from ctypes import Structure 18from enum import Enum 19from typing import Any 20from typing import Dict 21from typing import List 22from typing import NamedTuple 23from typing import Optional 24from typing import Union 25 26import pytest 27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode 28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode 29from atf_python.sys.netpfil.ipfw.insns import Insn 30from atf_python.sys.netpfil.ipfw.insns import InsnComment 31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty 32from atf_python.sys.netpfil.ipfw.insns import InsnIp 33from atf_python.sys.netpfil.ipfw.insns import InsnIp6 34from atf_python.sys.netpfil.ipfw.insns import InsnPorts 35from atf_python.sys.netpfil.ipfw.insns import InsnProb 36from atf_python.sys.netpfil.ipfw.insns import InsnProto 37from atf_python.sys.netpfil.ipfw.insns import InsnReject 38from atf_python.sys.netpfil.ipfw.insns import InsnTable 39from atf_python.sys.netpfil.ipfw.insns import InsnU32 40from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode 41from atf_python.sys.netpfil.ipfw.ioctl import CTlv 42from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule 43from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType 44from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule 45from atf_python.sys.netpfil.ipfw.ioctl import NTlv 46from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType 47from atf_python.sys.netpfil.ipfw.ioctl import RawRule 48from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader 49from atf_python.sys.netpfil.ipfw.utils import enum_from_int 50from atf_python.utils import BaseTest 51 52 53IPFW_PATH = "/sbin/ipfw" 54 55 56def differ(w_obj, g_obj, w_stack=[], g_stack=[]): 57 if bytes(w_obj) == bytes(g_obj): 58 return True 59 num_objects = 0 60 for i, w_child in enumerate(w_obj.obj_list): 61 if i >= len(g_obj.obj_list): 62 print("MISSING object from chain {}".format(" / ".join(w_stack))) 63 w_child.print_obj() 64 print("==========================") 65 return False 66 g_child = g_obj.obj_list[i] 67 if bytes(w_child) == bytes(g_child): 68 num_objects += 1 69 continue 70 w_stack.append(w_obj.obj_name) 71 g_stack.append(g_obj.obj_name) 72 if not differ(w_child, g_child, w_stack, g_stack): 73 return False 74 break 75 if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list): 76 g_child = g_obj.obj_list[num_objects] 77 print("EXTRA object from chain {}".format(" / ".join(g_stack))) 78 g_child.print_obj() 79 print("==========================") 80 return False 81 print("OBJECTS DIFFER") 82 print("WANTED CHAIN: {}".format(" / ".join(w_stack))) 83 w_obj.print_obj() 84 w_obj.print_obj_hex() 85 print("==========================") 86 print("GOT CHAIN: {}".format(" / ".join(g_stack))) 87 g_obj.print_obj() 88 g_obj.print_obj_hex() 89 print("==========================") 90 return False 91 92 93class TestAddRule(BaseTest): 94 def compile_rule(self, out): 95 tlvs = [] 96 if "objs" in out: 97 tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"])) 98 rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"]) 99 tlvs.append(CTlvRule(obj_list=[rule])) 100 return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs) 101 102 def verify_rule(self, in_data: str, out_data): 103 # Prepare the desired output 104 expected = self.compile_rule(out_data) 105 106 reader = DebugIoReader(IPFW_PATH) 107 ioctls = reader.get_records(in_data) 108 assert len(ioctls) == 1 # Only 1 ioctl request expected 109 got = ioctls[0] 110 111 if not differ(expected, got): 112 print("=> CMD: {}".format(in_data)) 113 print("=> WANTED:") 114 expected.print_obj() 115 print("==========================") 116 print("=> GOT:") 117 got.print_obj() 118 print("==========================") 119 assert bytes(got) == bytes(expected) 120 121 @pytest.mark.parametrize( 122 "rule", 123 [ 124 pytest.param( 125 { 126 "in": "add 200 allow ip from any to any", 127 "out": { 128 "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)], 129 "rulenum": 200, 130 }, 131 }, 132 id="test_rulenum", 133 ), 134 pytest.param( 135 { 136 "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any", 137 "out": { 138 "insns": [ 139 InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True), 140 InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"), 141 InsnEmpty(IpFwOpcode.O_ACCEPT), 142 ], 143 }, 144 }, 145 id="test_or", 146 ), 147 pytest.param( 148 { 149 "in": "add allow ip from table(AAA) to table(BBB)", 150 "out": { 151 "objs": [ 152 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 153 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 154 ], 155 "insns": [ 156 InsnU32(IpFwOpcode.O_IP_SRC_LOOKUP, u32=1), 157 InsnU32(IpFwOpcode.O_IP_DST_LOOKUP, u32=2), 158 InsnEmpty(IpFwOpcode.O_ACCEPT), 159 ], 160 }, 161 }, 162 id="test_tables", 163 ), 164 pytest.param( 165 { 166 "in": "add allow ip from any to 1.2.3.4 // test comment", 167 "out": { 168 "insns": [ 169 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 170 InsnComment(comment="test comment"), 171 InsnEmpty(IpFwOpcode.O_ACCEPT), 172 ], 173 }, 174 }, 175 id="test_comment", 176 ), 177 pytest.param( 178 { 179 "in": "add tcp-setmss 123 ip from any to 1.2.3.4", 180 "out": { 181 "objs": [ 182 NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"), 183 ], 184 "insns": [ 185 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 186 InsnU32(IpFwOpcode.O_EXTERNAL_ACTION, u32=1), 187 Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123), 188 ], 189 }, 190 }, 191 id="test_eaction_tcp-setmss", 192 ), 193 pytest.param( 194 { 195 "in": "add eaction ntpv6 AAA ip from any to 1.2.3.4", 196 "out": { 197 "objs": [ 198 NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="ntpv6"), 199 NTlv(0, idx=2, name="AAA"), 200 ], 201 "insns": [ 202 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 203 InsnU32(IpFwOpcode.O_EXTERNAL_ACTION, u32=1), 204 InsnU32(IpFwOpcode.O_EXTERNAL_INSTANCE, u32=2), 205 ], 206 }, 207 }, 208 id="test_eaction_ntp", 209 ), 210 pytest.param( 211 { 212 "in": "add // test comment", 213 "out": { 214 "insns": [ 215 InsnComment(comment="test comment"), 216 Insn(IpFwOpcode.O_COUNT), 217 ], 218 }, 219 }, 220 id="test_action_comment", 221 ), 222 pytest.param( 223 { 224 "in": "add check-state :OUT // test comment", 225 "out": { 226 "objs": [ 227 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), 228 ], 229 "insns": [ 230 InsnComment(comment="test comment"), 231 InsnU32(IpFwOpcode.O_CHECK_STATE, u32=1), 232 ], 233 }, 234 }, 235 id="test_check_state", 236 ), 237 pytest.param( 238 { 239 "in": "add allow tcp from any to any keep-state :OUT", 240 "out": { 241 "objs": [ 242 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), 243 ], 244 "insns": [ 245 InsnU32(IpFwOpcode.O_PROBE_STATE, u32=1), 246 Insn(IpFwOpcode.O_PROTO, arg1=6), 247 InsnU32(IpFwOpcode.O_KEEP_STATE, u32=1), 248 InsnEmpty(IpFwOpcode.O_ACCEPT), 249 ], 250 }, 251 }, 252 id="test_keep_state", 253 ), 254 pytest.param( 255 { 256 "in": "add allow tcp from any to any record-state", 257 "out": { 258 "objs": [ 259 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"), 260 ], 261 "insns": [ 262 Insn(IpFwOpcode.O_PROTO, arg1=6), 263 InsnU32(IpFwOpcode.O_KEEP_STATE, u32=1), 264 InsnEmpty(IpFwOpcode.O_ACCEPT), 265 ], 266 }, 267 }, 268 id="test_record_state", 269 ), 270 ], 271 ) 272 def test_add_rule(self, rule): 273 """Tests if the compiled rule is sane and matches the spec""" 274 self.verify_rule(rule["in"], rule["out"]) 275 276 @pytest.mark.parametrize( 277 "action", 278 [ 279 pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"), 280 pytest.param( 281 ( 282 "abort", 283 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT), 284 ), 285 id="abort", 286 ), 287 pytest.param( 288 ( 289 "abort6", 290 Insn( 291 IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT 292 ), 293 ), 294 id="abort6", 295 ), 296 pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"), 297 pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"), 298 pytest.param( 299 ( 300 "reject", 301 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST), 302 ), 303 id="reject", 304 ), 305 pytest.param( 306 ( 307 "reset", 308 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST), 309 ), 310 id="reset", 311 ), 312 pytest.param( 313 ( 314 "reset6", 315 Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST), 316 ), 317 id="reset6", 318 ), 319 pytest.param( 320 ( 321 "unreach port", 322 InsnReject( 323 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 324 ), 325 ), 326 id="unreach_port", 327 ), 328 pytest.param( 329 ( 330 "unreach port", 331 InsnReject( 332 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 333 ), 334 ), 335 id="unreach_port", 336 ), 337 pytest.param( 338 ( 339 "unreach needfrag", 340 InsnReject( 341 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG 342 ), 343 ), 344 id="unreach_needfrag", 345 ), 346 pytest.param( 347 ( 348 "unreach needfrag 1420", 349 InsnReject( 350 IpFwOpcode.O_REJECT, 351 arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG, 352 mtu=1420, 353 ), 354 ), 355 id="unreach_needfrag_mtu", 356 ), 357 pytest.param( 358 ( 359 "unreach6 port", 360 Insn( 361 IpFwOpcode.O_UNREACH6, 362 arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT, 363 ), 364 ), 365 id="unreach6_port", 366 ), 367 pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"), 368 # TOK_NAT 369 pytest.param( 370 ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42" 371 ), 372 pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"), 373 pytest.param( 374 ("skipto 42", InsnU32(IpFwOpcode.O_SKIPTO, u32=42)), id="skipto_42" 375 ), 376 pytest.param( 377 ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42" 378 ), 379 pytest.param( 380 ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42" 381 ), 382 pytest.param( 383 ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42" 384 ), 385 pytest.param( 386 ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd" 387 ), 388 pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"), 389 pytest.param( 390 ("call 420", InsnU32(IpFwOpcode.O_CALLRETURN, u32=420)), id="call_420" 391 ), 392 # TOK_FORWARD 393 pytest.param( 394 ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)), 395 id="setfib_1", 396 marks=pytest.mark.skip("needs net.fibs>1"), 397 ), 398 pytest.param( 399 ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)), 400 id="setdscp_42", 401 ), 402 pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"), 403 pytest.param( 404 ("return", InsnU32(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return" 405 ), 406 ], 407 ) 408 def test_add_action(self, action): 409 """Tests if the rule action is compiled properly""" 410 rule_in = "add {} ip from any to any".format(action[0]) 411 rule_out = {"insns": [action[1]]} 412 self.verify_rule(rule_in, rule_out) 413 414 @pytest.mark.parametrize( 415 "insn", 416 [ 417 pytest.param( 418 { 419 "in": "add prob 0.7 allow ip from any to any", 420 "out": InsnProb(prob=0.7), 421 }, 422 id="test_prob", 423 ), 424 pytest.param( 425 { 426 "in": "add allow tcp from any to any", 427 "out": InsnProto(arg1=6), 428 }, 429 id="test_proto", 430 ), 431 pytest.param( 432 { 433 "in": "add allow ip from any to any 57", 434 "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]), 435 }, 436 id="test_ports", 437 ), 438 ], 439 ) 440 def test_add_single_instruction(self, insn): 441 """Tests if the compiled rule is sane and matches the spec""" 442 443 # Prepare the desired output 444 out = { 445 "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)], 446 } 447 self.verify_rule(insn["in"], out) 448 449 @pytest.mark.parametrize( 450 "opcode", 451 [ 452 pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"), 453 pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"), 454 ], 455 ) 456 @pytest.mark.parametrize( 457 "params", 458 [ 459 pytest.param( 460 { 461 "in": "57", 462 "out": [(57, 57)], 463 }, 464 id="test_single", 465 ), 466 pytest.param( 467 { 468 "in": "57-59", 469 "out": [(57, 59)], 470 }, 471 id="test_range", 472 ), 473 pytest.param( 474 { 475 "in": "57-59,41", 476 "out": [(57, 59), (41, 41)], 477 }, 478 id="test_ranges", 479 ), 480 ], 481 ) 482 def test_add_ports(self, params, opcode): 483 if opcode == IpFwOpcode.O_IP_DSTPORT: 484 txt = "add allow ip from any to any " + params["in"] 485 else: 486 txt = "add allow ip from any " + params["in"] + " to any" 487 out = { 488 "insns": [ 489 InsnPorts(opcode, port_pairs=params["out"]), 490 InsnEmpty(IpFwOpcode.O_ACCEPT), 491 ] 492 } 493 self.verify_rule(txt, out) 494