1import errno 2import json 3import os 4import socket 5import struct 6import subprocess 7import sys 8from ctypes import c_byte 9from ctypes import c_char 10from ctypes import c_int 11from ctypes import c_long 12from ctypes import c_uint32 13from ctypes import c_uint8 14from ctypes import c_ulong 15from ctypes import c_ushort 16from ctypes import sizeof 17from ctypes import Structure 18from enum import Enum 19from typing import Any 20from typing import Dict 21from typing import List 22from typing import NamedTuple 23from typing import Optional 24from typing import Union 25 26import pytest 27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode 28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode 29from atf_python.sys.netpfil.ipfw.insns import Insn 30from atf_python.sys.netpfil.ipfw.insns import InsnComment 31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty 32from atf_python.sys.netpfil.ipfw.insns import InsnIp 33from atf_python.sys.netpfil.ipfw.insns import InsnIp6 34from atf_python.sys.netpfil.ipfw.insns import InsnPorts 35from atf_python.sys.netpfil.ipfw.insns import InsnProb 36from atf_python.sys.netpfil.ipfw.insns import InsnProto 37from atf_python.sys.netpfil.ipfw.insns import InsnReject 38from atf_python.sys.netpfil.ipfw.insns import InsnTable 39from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode 40from atf_python.sys.netpfil.ipfw.ioctl import CTlv 41from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule 42from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType 43from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule 44from atf_python.sys.netpfil.ipfw.ioctl import NTlv 45from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType 46from atf_python.sys.netpfil.ipfw.ioctl import RawRule 47from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader 48from atf_python.sys.netpfil.ipfw.utils import enum_from_int 49from atf_python.utils import BaseTest 50 51 52IPFW_PATH = "/sbin/ipfw" 53 54 55def differ(w_obj, g_obj, w_stack=[], g_stack=[]): 56 if bytes(w_obj) == bytes(g_obj): 57 return True 58 num_objects = 0 59 for i, w_child in enumerate(w_obj.obj_list): 60 if i > len(g_obj.obj_list): 61 print("MISSING object from chain {}".format(" / ".join(w_stack))) 62 w_child.print_obj() 63 print("==========================") 64 return False 65 g_child = g_obj.obj_list[i] 66 if bytes(w_child) == bytes(g_child): 67 num_objects += 1 68 continue 69 w_stack.append(w_obj.obj_name) 70 g_stack.append(g_obj.obj_name) 71 if not differ(w_child, g_child, w_stack, g_stack): 72 return False 73 break 74 if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list): 75 g_child = g_obj.obj_list[num_objects] 76 print("EXTRA object from chain {}".format(" / ".join(g_stack))) 77 g_child.print_obj() 78 print("==========================") 79 return False 80 print("OBJECTS DIFFER") 81 print("WANTED CHAIN: {}".format(" / ".join(w_stack))) 82 w_obj.print_obj() 83 w_obj.print_obj_hex() 84 print("==========================") 85 print("GOT CHAIN: {}".format(" / ".join(g_stack))) 86 g_obj.print_obj() 87 g_obj.print_obj_hex() 88 print("==========================") 89 return False 90 91 92class TestAddRule(BaseTest): 93 def compile_rule(self, out): 94 tlvs = [] 95 if "objs" in out: 96 tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"])) 97 rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"]) 98 tlvs.append(CTlvRule(obj_list=[rule])) 99 return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs) 100 101 def verify_rule(self, in_data: str, out_data): 102 # Prepare the desired output 103 expected = self.compile_rule(out_data) 104 105 reader = DebugIoReader(IPFW_PATH) 106 ioctls = reader.get_records(in_data) 107 assert len(ioctls) == 1 # Only 1 ioctl request expected 108 got = ioctls[0] 109 110 if not differ(expected, got): 111 print("=> CMD: {}".format(in_data)) 112 print("=> WANTED:") 113 expected.print_obj() 114 print("==========================") 115 print("=> GOT:") 116 got.print_obj() 117 print("==========================") 118 assert bytes(got) == bytes(expected) 119 120 @pytest.mark.parametrize( 121 "rule", 122 [ 123 pytest.param( 124 { 125 "in": "add 200 allow ip from any to any", 126 "out": { 127 "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)], 128 "rulenum": 200, 129 }, 130 }, 131 id="test_rulenum", 132 ), 133 pytest.param( 134 { 135 "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any", 136 "out": { 137 "insns": [ 138 InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True), 139 InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"), 140 InsnEmpty(IpFwOpcode.O_ACCEPT), 141 ], 142 }, 143 }, 144 id="test_or", 145 ), 146 pytest.param( 147 { 148 "in": "add allow ip from table(AAA) to table(BBB)", 149 "out": { 150 "objs": [ 151 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 152 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 153 ], 154 "insns": [ 155 InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1), 156 InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2), 157 InsnEmpty(IpFwOpcode.O_ACCEPT), 158 ], 159 }, 160 }, 161 id="test_tables", 162 ), 163 pytest.param( 164 { 165 "in": "add allow ip from any to 1.2.3.4 // test comment", 166 "out": { 167 "insns": [ 168 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 169 InsnComment(comment="test comment"), 170 InsnEmpty(IpFwOpcode.O_ACCEPT), 171 ], 172 }, 173 }, 174 id="test_comment", 175 ), 176 ], 177 ) 178 def test_add_rule(self, rule): 179 """Tests if the compiled rule is sane and matches the spec""" 180 self.verify_rule(rule["in"], rule["out"]) 181 182 @pytest.mark.parametrize( 183 "action", 184 [ 185 pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"), 186 pytest.param( 187 ( 188 "abort", 189 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT), 190 ), 191 id="abort", 192 ), 193 pytest.param( 194 ( 195 "abort6", 196 Insn( 197 IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT 198 ), 199 ), 200 id="abort6", 201 ), 202 pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"), 203 pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"), 204 pytest.param( 205 ( 206 "reject", 207 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST), 208 ), 209 id="reject", 210 ), 211 pytest.param( 212 ( 213 "reset", 214 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST), 215 ), 216 id="reset", 217 ), 218 pytest.param( 219 ( 220 "reset6", 221 Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST), 222 ), 223 id="reset6", 224 ), 225 pytest.param( 226 ( 227 "unreach port", 228 InsnReject( 229 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 230 ), 231 ), 232 id="unreach_port", 233 ), 234 pytest.param( 235 ( 236 "unreach port", 237 InsnReject( 238 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 239 ), 240 ), 241 id="unreach_port", 242 ), 243 pytest.param( 244 ( 245 "unreach needfrag", 246 InsnReject( 247 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG 248 ), 249 ), 250 id="unreach_needfrag", 251 ), 252 pytest.param( 253 ( 254 "unreach needfrag 1420", 255 InsnReject( 256 IpFwOpcode.O_REJECT, 257 arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG, 258 mtu=1420, 259 ), 260 ), 261 id="unreach_needfrag_mtu", 262 ), 263 pytest.param( 264 ( 265 "unreach6 port", 266 Insn( 267 IpFwOpcode.O_UNREACH6, 268 arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT, 269 ), 270 ), 271 id="unreach6_port", 272 ), 273 pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"), 274 # TOK_NAT 275 pytest.param( 276 ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42" 277 ), 278 pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"), 279 pytest.param( 280 ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42" 281 ), 282 pytest.param( 283 ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42" 284 ), 285 pytest.param( 286 ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42" 287 ), 288 pytest.param( 289 ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42" 290 ), 291 pytest.param( 292 ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd" 293 ), 294 pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"), 295 pytest.param( 296 ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420" 297 ), 298 # TOK_FORWARD 299 # TOK_COMMENT 300 pytest.param( 301 ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)), 302 id="setfib_1", 303 marks=pytest.mark.skip("needs net.fibs>1"), 304 ), 305 pytest.param( 306 ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)), 307 id="setdscp_42", 308 ), 309 pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"), 310 pytest.param( 311 ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return" 312 ), 313 ], 314 ) 315 def test_add_action(self, action): 316 """Tests if the rule action is compiled properly""" 317 rule_in = "add {} ip from any to any".format(action[0]) 318 rule_out = {"insns": [action[1]]} 319 self.verify_rule(rule_in, rule_out) 320 321 @pytest.mark.parametrize( 322 "insn", 323 [ 324 pytest.param( 325 { 326 "in": "add prob 0.7 allow ip from any to any", 327 "out": InsnProb(prob=0.7), 328 }, 329 id="test_prob", 330 ), 331 pytest.param( 332 { 333 "in": "add allow tcp from any to any", 334 "out": InsnProto(arg1=6), 335 }, 336 id="test_proto", 337 ), 338 pytest.param( 339 { 340 "in": "add allow ip from any to any 57", 341 "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]), 342 }, 343 id="test_ports", 344 ), 345 ], 346 ) 347 def test_add_single_instruction(self, insn): 348 """Tests if the compiled rule is sane and matches the spec""" 349 350 # Prepare the desired output 351 out = { 352 "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)], 353 } 354 self.verify_rule(insn["in"], out) 355 356 @pytest.mark.parametrize( 357 "opcode", 358 [ 359 pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"), 360 pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"), 361 ], 362 ) 363 @pytest.mark.parametrize( 364 "params", 365 [ 366 pytest.param( 367 { 368 "in": "57", 369 "out": [(57, 57)], 370 }, 371 id="test_single", 372 ), 373 pytest.param( 374 { 375 "in": "57-59", 376 "out": [(57, 59)], 377 }, 378 id="test_range", 379 ), 380 pytest.param( 381 { 382 "in": "57-59,41", 383 "out": [(57, 59), (41, 41)], 384 }, 385 id="test_ranges", 386 ), 387 ], 388 ) 389 def test_add_ports(self, params, opcode): 390 if opcode == IpFwOpcode.O_IP_DSTPORT: 391 txt = "add allow ip from any to any " + params["in"] 392 else: 393 txt = "add allow ip from any " + params["in"] + " to any" 394 out = { 395 "insns": [ 396 InsnPorts(opcode, port_pairs=params["out"]), 397 InsnEmpty(IpFwOpcode.O_ACCEPT), 398 ] 399 } 400 self.verify_rule(txt, out) 401