xref: /freebsd/sbin/ipfw/tests/test_add_rule.py (revision 2d1d418e1e7bc8325bb052185c17c81a674d0c4e)
1import errno
2import json
3import os
4import socket
5import struct
6import subprocess
7import sys
8from ctypes import c_byte
9from ctypes import c_char
10from ctypes import c_int
11from ctypes import c_long
12from ctypes import c_uint32
13from ctypes import c_uint8
14from ctypes import c_ulong
15from ctypes import c_ushort
16from ctypes import sizeof
17from ctypes import Structure
18from enum import Enum
19from typing import Any
20from typing import Dict
21from typing import List
22from typing import NamedTuple
23from typing import Optional
24from typing import Union
25
26import pytest
27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
29from atf_python.sys.netpfil.ipfw.insns import Insn
30from atf_python.sys.netpfil.ipfw.insns import InsnComment
31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
32from atf_python.sys.netpfil.ipfw.insns import InsnIp
33from atf_python.sys.netpfil.ipfw.insns import InsnIp6
34from atf_python.sys.netpfil.ipfw.insns import InsnPorts
35from atf_python.sys.netpfil.ipfw.insns import InsnProb
36from atf_python.sys.netpfil.ipfw.insns import InsnProto
37from atf_python.sys.netpfil.ipfw.insns import InsnReject
38from atf_python.sys.netpfil.ipfw.insns import InsnTable
39from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
40from atf_python.sys.netpfil.ipfw.ioctl import CTlv
41from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
42from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
43from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
44from atf_python.sys.netpfil.ipfw.ioctl import NTlv
45from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
46from atf_python.sys.netpfil.ipfw.ioctl import RawRule
47from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
48from atf_python.sys.netpfil.ipfw.utils import enum_from_int
49from atf_python.utils import BaseTest
50
51
52IPFW_PATH = "/sbin/ipfw"
53
54
55def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
56    if bytes(w_obj) == bytes(g_obj):
57        return True
58    num_objects = 0
59    for i, w_child in enumerate(w_obj.obj_list):
60        if i > len(g_obj.obj_list):
61            print("MISSING object from chain {}".format(" / ".join(w_stack)))
62            w_child.print_obj()
63            print("==========================")
64            return False
65        g_child = g_obj.obj_list[i]
66        if bytes(w_child) == bytes(g_child):
67            num_objects += 1
68            continue
69        w_stack.append(w_obj.obj_name)
70        g_stack.append(g_obj.obj_name)
71        if not differ(w_child, g_child, w_stack, g_stack):
72            return False
73        break
74    if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
75        g_child = g_obj.obj_list[num_objects]
76        print("EXTRA object from chain {}".format(" / ".join(g_stack)))
77        g_child.print_obj()
78        print("==========================")
79        return False
80    print("OBJECTS DIFFER")
81    print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
82    w_obj.print_obj()
83    w_obj.print_obj_hex()
84    print("==========================")
85    print("GOT CHAIN: {}".format(" / ".join(g_stack)))
86    g_obj.print_obj()
87    g_obj.print_obj_hex()
88    print("==========================")
89    return False
90
91
92class TestAddRule(BaseTest):
93    def compile_rule(self, out):
94        tlvs = []
95        if "objs" in out:
96            tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
97        rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
98        tlvs.append(CTlvRule(obj_list=[rule]))
99        return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
100
101    def verify_rule(self, in_data: str, out_data):
102        # Prepare the desired output
103        expected = self.compile_rule(out_data)
104
105        reader = DebugIoReader(IPFW_PATH)
106        ioctls = reader.get_records(in_data)
107        assert len(ioctls) == 1  # Only 1 ioctl request expected
108        got = ioctls[0]
109
110        if not differ(expected, got):
111            print("=> CMD: {}".format(in_data))
112            print("=> WANTED:")
113            expected.print_obj()
114            print("==========================")
115            print("=> GOT:")
116            got.print_obj()
117            print("==========================")
118        assert bytes(got) == bytes(expected)
119
120    @pytest.mark.parametrize(
121        "rule",
122        [
123            pytest.param(
124                {
125                    "in": "add 200 allow ip from any to any",
126                    "out": {
127                        "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
128                        "rulenum": 200,
129                    },
130                },
131                id="test_rulenum",
132            ),
133            pytest.param(
134                {
135                    "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
136                    "out": {
137                        "insns": [
138                            InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
139                            InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
140                            InsnEmpty(IpFwOpcode.O_ACCEPT),
141                        ],
142                    },
143                },
144                id="test_or",
145            ),
146            pytest.param(
147                {
148                    "in": "add allow ip from table(AAA) to table(BBB)",
149                    "out": {
150                        "objs": [
151                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
152                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
153                        ],
154                        "insns": [
155                            InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1),
156                            InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2),
157                            InsnEmpty(IpFwOpcode.O_ACCEPT),
158                        ],
159                    },
160                },
161                id="test_tables",
162            ),
163            pytest.param(
164                {
165                    "in": "add allow ip from any to 1.2.3.4 // test comment",
166                    "out": {
167                        "insns": [
168                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
169                            InsnComment(comment="test comment"),
170                            InsnEmpty(IpFwOpcode.O_ACCEPT),
171                        ],
172                    },
173                },
174                id="test_comment",
175            ),
176            pytest.param(
177                {
178                    "in": "add tcp-setmss 123 ip from any to 1.2.3.4",
179                    "out": {
180                        "objs": [
181                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"),
182                        ],
183                        "insns": [
184                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
185                            Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1),
186                            Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123),
187                        ],
188                    },
189                },
190                id="test_eaction_tcp-setmss",
191            ),
192            pytest.param(
193                {
194                    "in": "add eaction ntpv6 AAA ip from any to 1.2.3.4",
195                    "out": {
196                        "objs": [
197                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="ntpv6"),
198                            NTlv(0, idx=2, name="AAA"),
199                        ],
200                        "insns": [
201                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
202                            Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1),
203                            Insn(IpFwOpcode.O_EXTERNAL_INSTANCE, arg1=2),
204                        ],
205                    },
206                },
207                id="test_eaction_ntp",
208            ),
209        ],
210    )
211    def test_add_rule(self, rule):
212        """Tests if the compiled rule is sane and matches the spec"""
213        self.verify_rule(rule["in"], rule["out"])
214
215    @pytest.mark.parametrize(
216        "action",
217        [
218            pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
219            pytest.param(
220                (
221                    "abort",
222                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
223                ),
224                id="abort",
225            ),
226            pytest.param(
227                (
228                    "abort6",
229                    Insn(
230                        IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
231                    ),
232                ),
233                id="abort6",
234            ),
235            pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
236            pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
237            pytest.param(
238                (
239                    "reject",
240                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
241                ),
242                id="reject",
243            ),
244            pytest.param(
245                (
246                    "reset",
247                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
248                ),
249                id="reset",
250            ),
251            pytest.param(
252                (
253                    "reset6",
254                    Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
255                ),
256                id="reset6",
257            ),
258            pytest.param(
259                (
260                    "unreach port",
261                    InsnReject(
262                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
263                    ),
264                ),
265                id="unreach_port",
266            ),
267            pytest.param(
268                (
269                    "unreach port",
270                    InsnReject(
271                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
272                    ),
273                ),
274                id="unreach_port",
275            ),
276            pytest.param(
277                (
278                    "unreach needfrag",
279                    InsnReject(
280                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
281                    ),
282                ),
283                id="unreach_needfrag",
284            ),
285            pytest.param(
286                (
287                    "unreach needfrag 1420",
288                    InsnReject(
289                        IpFwOpcode.O_REJECT,
290                        arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
291                        mtu=1420,
292                    ),
293                ),
294                id="unreach_needfrag_mtu",
295            ),
296            pytest.param(
297                (
298                    "unreach6 port",
299                    Insn(
300                        IpFwOpcode.O_UNREACH6,
301                        arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
302                    ),
303                ),
304                id="unreach6_port",
305            ),
306            pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
307            # TOK_NAT
308            pytest.param(
309                ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
310            ),
311            pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
312            pytest.param(
313                ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42"
314            ),
315            pytest.param(
316                ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
317            ),
318            pytest.param(
319                ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
320            ),
321            pytest.param(
322                ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
323            ),
324            pytest.param(
325                ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
326            ),
327            pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
328            pytest.param(
329                ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420"
330            ),
331            # TOK_FORWARD
332            # TOK_COMMENT
333            pytest.param(
334                ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
335                id="setfib_1",
336                marks=pytest.mark.skip("needs net.fibs>1"),
337            ),
338            pytest.param(
339                ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
340                id="setdscp_42",
341            ),
342            pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
343            pytest.param(
344                ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
345            ),
346        ],
347    )
348    def test_add_action(self, action):
349        """Tests if the rule action is compiled properly"""
350        rule_in = "add {} ip from any to any".format(action[0])
351        rule_out = {"insns": [action[1]]}
352        self.verify_rule(rule_in, rule_out)
353
354    @pytest.mark.parametrize(
355        "insn",
356        [
357            pytest.param(
358                {
359                    "in": "add prob 0.7 allow ip from any to any",
360                    "out": InsnProb(prob=0.7),
361                },
362                id="test_prob",
363            ),
364            pytest.param(
365                {
366                    "in": "add allow tcp from any to any",
367                    "out": InsnProto(arg1=6),
368                },
369                id="test_proto",
370            ),
371            pytest.param(
372                {
373                    "in": "add allow ip from any to any 57",
374                    "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
375                },
376                id="test_ports",
377            ),
378        ],
379    )
380    def test_add_single_instruction(self, insn):
381        """Tests if the compiled rule is sane and matches the spec"""
382
383        # Prepare the desired output
384        out = {
385            "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
386        }
387        self.verify_rule(insn["in"], out)
388
389    @pytest.mark.parametrize(
390        "opcode",
391        [
392            pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
393            pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
394        ],
395    )
396    @pytest.mark.parametrize(
397        "params",
398        [
399            pytest.param(
400                {
401                    "in": "57",
402                    "out": [(57, 57)],
403                },
404                id="test_single",
405            ),
406            pytest.param(
407                {
408                    "in": "57-59",
409                    "out": [(57, 59)],
410                },
411                id="test_range",
412            ),
413            pytest.param(
414                {
415                    "in": "57-59,41",
416                    "out": [(57, 59), (41, 41)],
417                },
418                id="test_ranges",
419            ),
420        ],
421    )
422    def test_add_ports(self, params, opcode):
423        if opcode == IpFwOpcode.O_IP_DSTPORT:
424            txt = "add allow ip from any to any " + params["in"]
425        else:
426            txt = "add allow ip from any " + params["in"] + " to any"
427        out = {
428            "insns": [
429                InsnPorts(opcode, port_pairs=params["out"]),
430                InsnEmpty(IpFwOpcode.O_ACCEPT),
431            ]
432        }
433        self.verify_rule(txt, out)
434