xref: /freebsd/sbin/ipfw/tests/test_add_rule.py (revision 4fbb9c43aa44d9145151bb5f77d302ba01fb7551)
1import errno
2import json
3import os
4import socket
5import struct
6import subprocess
7import sys
8from ctypes import c_byte
9from ctypes import c_char
10from ctypes import c_int
11from ctypes import c_long
12from ctypes import c_uint32
13from ctypes import c_uint8
14from ctypes import c_ulong
15from ctypes import c_ushort
16from ctypes import sizeof
17from ctypes import Structure
18from enum import Enum
19from typing import Any
20from typing import Dict
21from typing import List
22from typing import NamedTuple
23from typing import Optional
24from typing import Union
25
26import pytest
27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
29from atf_python.sys.netpfil.ipfw.insns import Insn
30from atf_python.sys.netpfil.ipfw.insns import InsnComment
31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
32from atf_python.sys.netpfil.ipfw.insns import InsnIp
33from atf_python.sys.netpfil.ipfw.insns import InsnIp6
34from atf_python.sys.netpfil.ipfw.insns import InsnPorts
35from atf_python.sys.netpfil.ipfw.insns import InsnProb
36from atf_python.sys.netpfil.ipfw.insns import InsnProto
37from atf_python.sys.netpfil.ipfw.insns import InsnReject
38from atf_python.sys.netpfil.ipfw.insns import InsnTable
39from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
40from atf_python.sys.netpfil.ipfw.ioctl import CTlv
41from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
42from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
43from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
44from atf_python.sys.netpfil.ipfw.ioctl import NTlv
45from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
46from atf_python.sys.netpfil.ipfw.ioctl import RawRule
47from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
48from atf_python.sys.netpfil.ipfw.utils import enum_from_int
49from atf_python.utils import BaseTest
50
51
52IPFW_PATH = "/sbin/ipfw"
53
54
55def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
56    if bytes(w_obj) == bytes(g_obj):
57        return True
58    num_objects = 0
59    for i, w_child in enumerate(w_obj.obj_list):
60        if i >= len(g_obj.obj_list):
61            print("MISSING object from chain {}".format(" / ".join(w_stack)))
62            w_child.print_obj()
63            print("==========================")
64            return False
65        g_child = g_obj.obj_list[i]
66        if bytes(w_child) == bytes(g_child):
67            num_objects += 1
68            continue
69        w_stack.append(w_obj.obj_name)
70        g_stack.append(g_obj.obj_name)
71        if not differ(w_child, g_child, w_stack, g_stack):
72            return False
73        break
74    if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
75        g_child = g_obj.obj_list[num_objects]
76        print("EXTRA object from chain {}".format(" / ".join(g_stack)))
77        g_child.print_obj()
78        print("==========================")
79        return False
80    print("OBJECTS DIFFER")
81    print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
82    w_obj.print_obj()
83    w_obj.print_obj_hex()
84    print("==========================")
85    print("GOT CHAIN: {}".format(" / ".join(g_stack)))
86    g_obj.print_obj()
87    g_obj.print_obj_hex()
88    print("==========================")
89    return False
90
91
92class TestAddRule(BaseTest):
93    def compile_rule(self, out):
94        tlvs = []
95        if "objs" in out:
96            tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
97        rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
98        tlvs.append(CTlvRule(obj_list=[rule]))
99        return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
100
101    def verify_rule(self, in_data: str, out_data):
102        # Prepare the desired output
103        expected = self.compile_rule(out_data)
104
105        reader = DebugIoReader(IPFW_PATH)
106        ioctls = reader.get_records(in_data)
107        assert len(ioctls) == 1  # Only 1 ioctl request expected
108        got = ioctls[0]
109
110        if not differ(expected, got):
111            print("=> CMD: {}".format(in_data))
112            print("=> WANTED:")
113            expected.print_obj()
114            print("==========================")
115            print("=> GOT:")
116            got.print_obj()
117            print("==========================")
118        assert bytes(got) == bytes(expected)
119
120    @pytest.mark.parametrize(
121        "rule",
122        [
123            pytest.param(
124                {
125                    "in": "add 200 allow ip from any to any",
126                    "out": {
127                        "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
128                        "rulenum": 200,
129                    },
130                },
131                id="test_rulenum",
132            ),
133            pytest.param(
134                {
135                    "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
136                    "out": {
137                        "insns": [
138                            InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
139                            InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
140                            InsnEmpty(IpFwOpcode.O_ACCEPT),
141                        ],
142                    },
143                },
144                id="test_or",
145            ),
146            pytest.param(
147                {
148                    "in": "add allow ip from table(AAA) to table(BBB)",
149                    "out": {
150                        "objs": [
151                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
152                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
153                        ],
154                        "insns": [
155                            InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1),
156                            InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2),
157                            InsnEmpty(IpFwOpcode.O_ACCEPT),
158                        ],
159                    },
160                },
161                id="test_tables",
162            ),
163            pytest.param(
164                {
165                    "in": "add allow ip from any to 1.2.3.4 // test comment",
166                    "out": {
167                        "insns": [
168                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
169                            InsnComment(comment="test comment"),
170                            InsnEmpty(IpFwOpcode.O_ACCEPT),
171                        ],
172                    },
173                },
174                id="test_comment",
175            ),
176            pytest.param(
177                {
178                    "in": "add tcp-setmss 123 ip from any to 1.2.3.4",
179                    "out": {
180                        "objs": [
181                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"),
182                        ],
183                        "insns": [
184                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
185                            Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1),
186                            Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123),
187                        ],
188                    },
189                },
190                id="test_eaction_tcp-setmss",
191            ),
192            pytest.param(
193                {
194                    "in": "add eaction ntpv6 AAA ip from any to 1.2.3.4",
195                    "out": {
196                        "objs": [
197                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="ntpv6"),
198                            NTlv(0, idx=2, name="AAA"),
199                        ],
200                        "insns": [
201                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
202                            Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1),
203                            Insn(IpFwOpcode.O_EXTERNAL_INSTANCE, arg1=2),
204                        ],
205                    },
206                },
207                id="test_eaction_ntp",
208            ),
209            pytest.param(
210                {
211                    "in": "add // test comment",
212                    "out": {
213                        "insns": [
214                            InsnComment(comment="test comment"),
215                            Insn(IpFwOpcode.O_COUNT),
216                        ],
217                    },
218                },
219                id="test_action_comment",
220            ),
221            pytest.param(
222                {
223                    "in": "add check-state :OUT // test comment",
224                    "out": {
225                        "objs": [
226                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
227                        ],
228                        "insns": [
229                            InsnComment(comment="test comment"),
230                            Insn(IpFwOpcode.O_CHECK_STATE, arg1=1),
231                        ],
232                    },
233                },
234                id="test_check_state",
235            ),
236            pytest.param(
237                {
238                    "in": "add allow tcp from any to any keep-state :OUT",
239                    "out": {
240                        "objs": [
241                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
242                        ],
243                        "insns": [
244                            Insn(IpFwOpcode.O_PROBE_STATE, arg1=1),
245                            Insn(IpFwOpcode.O_PROTO, arg1=6),
246                            Insn(IpFwOpcode.O_KEEP_STATE, arg1=1),
247                            InsnEmpty(IpFwOpcode.O_ACCEPT),
248                        ],
249                    },
250                },
251                id="test_keep_state",
252            ),
253            pytest.param(
254                {
255                    "in": "add allow tcp from any to any record-state",
256                    "out": {
257                        "objs": [
258                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"),
259                        ],
260                        "insns": [
261                            Insn(IpFwOpcode.O_PROTO, arg1=6),
262                            Insn(IpFwOpcode.O_KEEP_STATE, arg1=1),
263                            InsnEmpty(IpFwOpcode.O_ACCEPT),
264                        ],
265                    },
266                },
267                id="test_record_state",
268            ),
269        ],
270    )
271    def test_add_rule(self, rule):
272        """Tests if the compiled rule is sane and matches the spec"""
273        self.verify_rule(rule["in"], rule["out"])
274
275    @pytest.mark.parametrize(
276        "action",
277        [
278            pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
279            pytest.param(
280                (
281                    "abort",
282                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
283                ),
284                id="abort",
285            ),
286            pytest.param(
287                (
288                    "abort6",
289                    Insn(
290                        IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
291                    ),
292                ),
293                id="abort6",
294            ),
295            pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
296            pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
297            pytest.param(
298                (
299                    "reject",
300                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
301                ),
302                id="reject",
303            ),
304            pytest.param(
305                (
306                    "reset",
307                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
308                ),
309                id="reset",
310            ),
311            pytest.param(
312                (
313                    "reset6",
314                    Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
315                ),
316                id="reset6",
317            ),
318            pytest.param(
319                (
320                    "unreach port",
321                    InsnReject(
322                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
323                    ),
324                ),
325                id="unreach_port",
326            ),
327            pytest.param(
328                (
329                    "unreach port",
330                    InsnReject(
331                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
332                    ),
333                ),
334                id="unreach_port",
335            ),
336            pytest.param(
337                (
338                    "unreach needfrag",
339                    InsnReject(
340                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
341                    ),
342                ),
343                id="unreach_needfrag",
344            ),
345            pytest.param(
346                (
347                    "unreach needfrag 1420",
348                    InsnReject(
349                        IpFwOpcode.O_REJECT,
350                        arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
351                        mtu=1420,
352                    ),
353                ),
354                id="unreach_needfrag_mtu",
355            ),
356            pytest.param(
357                (
358                    "unreach6 port",
359                    Insn(
360                        IpFwOpcode.O_UNREACH6,
361                        arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
362                    ),
363                ),
364                id="unreach6_port",
365            ),
366            pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
367            # TOK_NAT
368            pytest.param(
369                ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
370            ),
371            pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
372            pytest.param(
373                ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42"
374            ),
375            pytest.param(
376                ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
377            ),
378            pytest.param(
379                ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
380            ),
381            pytest.param(
382                ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
383            ),
384            pytest.param(
385                ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
386            ),
387            pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
388            pytest.param(
389                ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420"
390            ),
391            # TOK_FORWARD
392            pytest.param(
393                ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
394                id="setfib_1",
395                marks=pytest.mark.skip("needs net.fibs>1"),
396            ),
397            pytest.param(
398                ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
399                id="setdscp_42",
400            ),
401            pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
402            pytest.param(
403                ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
404            ),
405        ],
406    )
407    def test_add_action(self, action):
408        """Tests if the rule action is compiled properly"""
409        rule_in = "add {} ip from any to any".format(action[0])
410        rule_out = {"insns": [action[1]]}
411        self.verify_rule(rule_in, rule_out)
412
413    @pytest.mark.parametrize(
414        "insn",
415        [
416            pytest.param(
417                {
418                    "in": "add prob 0.7 allow ip from any to any",
419                    "out": InsnProb(prob=0.7),
420                },
421                id="test_prob",
422            ),
423            pytest.param(
424                {
425                    "in": "add allow tcp from any to any",
426                    "out": InsnProto(arg1=6),
427                },
428                id="test_proto",
429            ),
430            pytest.param(
431                {
432                    "in": "add allow ip from any to any 57",
433                    "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
434                },
435                id="test_ports",
436            ),
437        ],
438    )
439    def test_add_single_instruction(self, insn):
440        """Tests if the compiled rule is sane and matches the spec"""
441
442        # Prepare the desired output
443        out = {
444            "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
445        }
446        self.verify_rule(insn["in"], out)
447
448    @pytest.mark.parametrize(
449        "opcode",
450        [
451            pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
452            pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
453        ],
454    )
455    @pytest.mark.parametrize(
456        "params",
457        [
458            pytest.param(
459                {
460                    "in": "57",
461                    "out": [(57, 57)],
462                },
463                id="test_single",
464            ),
465            pytest.param(
466                {
467                    "in": "57-59",
468                    "out": [(57, 59)],
469                },
470                id="test_range",
471            ),
472            pytest.param(
473                {
474                    "in": "57-59,41",
475                    "out": [(57, 59), (41, 41)],
476                },
477                id="test_ranges",
478            ),
479        ],
480    )
481    def test_add_ports(self, params, opcode):
482        if opcode == IpFwOpcode.O_IP_DSTPORT:
483            txt = "add allow ip from any to any " + params["in"]
484        else:
485            txt = "add allow ip from any " + params["in"] + " to any"
486        out = {
487            "insns": [
488                InsnPorts(opcode, port_pairs=params["out"]),
489                InsnEmpty(IpFwOpcode.O_ACCEPT),
490            ]
491        }
492        self.verify_rule(txt, out)
493