xref: /freebsd/sbin/ipfw/tests/test_add_rule.py (revision 9f44a47fd07924afc035991af15d84e6585dea4f)
1import errno
2import json
3import os
4import socket
5import struct
6import subprocess
7import sys
8from ctypes import c_byte
9from ctypes import c_char
10from ctypes import c_int
11from ctypes import c_long
12from ctypes import c_uint32
13from ctypes import c_uint8
14from ctypes import c_ulong
15from ctypes import c_ushort
16from ctypes import sizeof
17from ctypes import Structure
18from enum import Enum
19from typing import Any
20from typing import Dict
21from typing import List
22from typing import NamedTuple
23from typing import Optional
24from typing import Union
25
26import pytest
27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
29from atf_python.sys.netpfil.ipfw.insns import Insn
30from atf_python.sys.netpfil.ipfw.insns import InsnComment
31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
32from atf_python.sys.netpfil.ipfw.insns import InsnIp
33from atf_python.sys.netpfil.ipfw.insns import InsnIp6
34from atf_python.sys.netpfil.ipfw.insns import InsnPorts
35from atf_python.sys.netpfil.ipfw.insns import InsnProb
36from atf_python.sys.netpfil.ipfw.insns import InsnProto
37from atf_python.sys.netpfil.ipfw.insns import InsnReject
38from atf_python.sys.netpfil.ipfw.insns import InsnTable
39from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
40from atf_python.sys.netpfil.ipfw.ioctl import CTlv
41from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
42from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
43from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
44from atf_python.sys.netpfil.ipfw.ioctl import NTlv
45from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
46from atf_python.sys.netpfil.ipfw.ioctl import RawRule
47from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
48from atf_python.sys.netpfil.ipfw.utils import enum_from_int
49from atf_python.utils import BaseTest
50
51
52IPFW_PATH = "/sbin/ipfw"
53
54
55def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
56    if bytes(w_obj) == bytes(g_obj):
57        return True
58    num_objects = 0
59    for i, w_child in enumerate(w_obj.obj_list):
60        if i > len(g_obj.obj_list):
61            print("MISSING object from chain {}".format(" / ".join(w_stack)))
62            w_child.print_obj()
63            print("==========================")
64            return False
65        g_child = g_obj.obj_list[i]
66        if bytes(w_child) == bytes(g_child):
67            num_objects += 1
68            continue
69        w_stack.append(w_obj.obj_name)
70        g_stack.append(g_obj.obj_name)
71        if not differ(w_child, g_child, w_stack, g_stack):
72            return False
73        break
74    if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
75        g_child = g_obj.obj_list[num_objects]
76        print("EXTRA object from chain {}".format(" / ".join(g_stack)))
77        g_child.print_obj()
78        print("==========================")
79        return False
80    print("OBJECTS DIFFER")
81    print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
82    w_obj.print_obj()
83    w_obj.print_obj_hex()
84    print("==========================")
85    print("GOT CHAIN: {}".format(" / ".join(g_stack)))
86    g_obj.print_obj()
87    g_obj.print_obj_hex()
88    print("==========================")
89    return False
90
91
92class TestAddRule(BaseTest):
93    def compile_rule(self, out):
94        tlvs = []
95        if "objs" in out:
96            tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
97        rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
98        tlvs.append(CTlvRule(obj_list=[rule]))
99        return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
100
101    def verify_rule(self, in_data: str, out_data):
102        # Prepare the desired output
103        expected = self.compile_rule(out_data)
104
105        reader = DebugIoReader(IPFW_PATH)
106        ioctls = reader.get_records(in_data)
107        assert len(ioctls) == 1  # Only 1 ioctl request expected
108        got = ioctls[0]
109
110        if not differ(expected, got):
111            print("=> CMD: {}".format(in_data))
112            print("=> WANTED:")
113            expected.print_obj()
114            print("==========================")
115            print("=> GOT:")
116            got.print_obj()
117            print("==========================")
118        assert bytes(got) == bytes(expected)
119
120    @pytest.mark.parametrize(
121        "rule",
122        [
123            pytest.param(
124                {
125                    "in": "add 200 allow ip from any to any",
126                    "out": {
127                        "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
128                        "rulenum": 200,
129                    },
130                },
131                id="test_rulenum",
132            ),
133            pytest.param(
134                {
135                    "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
136                    "out": {
137                        "insns": [
138                            InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
139                            InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
140                            InsnEmpty(IpFwOpcode.O_ACCEPT),
141                        ],
142                    },
143                },
144                id="test_or",
145            ),
146            pytest.param(
147                {
148                    "in": "add allow ip from table(AAA) to table(BBB)",
149                    "out": {
150                        "objs": [
151                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
152                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
153                        ],
154                        "insns": [
155                            InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1),
156                            InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2),
157                            InsnEmpty(IpFwOpcode.O_ACCEPT),
158                        ],
159                    },
160                },
161                id="test_tables",
162            ),
163            pytest.param(
164                {
165                    "in": "add allow ip from any to 1.2.3.4 // test comment",
166                    "out": {
167                        "insns": [
168                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
169                            InsnComment(comment="test comment"),
170                            InsnEmpty(IpFwOpcode.O_ACCEPT),
171                        ],
172                    },
173                },
174                id="test_comment",
175            ),
176        ],
177    )
178    def test_add_rule(self, rule):
179        """Tests if the compiled rule is sane and matches the spec"""
180        self.verify_rule(rule["in"], rule["out"])
181
182    @pytest.mark.parametrize(
183        "action",
184        [
185            pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
186            pytest.param(
187                (
188                    "abort",
189                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
190                ),
191                id="abort",
192            ),
193            pytest.param(
194                (
195                    "abort6",
196                    Insn(
197                        IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
198                    ),
199                ),
200                id="abort6",
201            ),
202            pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
203            pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
204            pytest.param(
205                (
206                    "reject",
207                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
208                ),
209                id="reject",
210            ),
211            pytest.param(
212                (
213                    "reset",
214                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
215                ),
216                id="reset",
217            ),
218            pytest.param(
219                (
220                    "reset6",
221                    Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
222                ),
223                id="reset6",
224            ),
225            pytest.param(
226                (
227                    "unreach port",
228                    InsnReject(
229                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
230                    ),
231                ),
232                id="unreach_port",
233            ),
234            pytest.param(
235                (
236                    "unreach port",
237                    InsnReject(
238                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
239                    ),
240                ),
241                id="unreach_port",
242            ),
243            pytest.param(
244                (
245                    "unreach needfrag",
246                    InsnReject(
247                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
248                    ),
249                ),
250                id="unreach_needfrag",
251            ),
252            pytest.param(
253                (
254                    "unreach needfrag 1420",
255                    InsnReject(
256                        IpFwOpcode.O_REJECT,
257                        arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
258                        mtu=1420,
259                    ),
260                ),
261                id="unreach_needfrag_mtu",
262            ),
263            pytest.param(
264                (
265                    "unreach6 port",
266                    Insn(
267                        IpFwOpcode.O_UNREACH6,
268                        arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
269                    ),
270                ),
271                id="unreach6_port",
272            ),
273            pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
274            # TOK_NAT
275            pytest.param(
276                ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
277            ),
278            pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
279            pytest.param(
280                ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42"
281            ),
282            pytest.param(
283                ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
284            ),
285            pytest.param(
286                ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
287            ),
288            pytest.param(
289                ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
290            ),
291            pytest.param(
292                ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
293            ),
294            pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
295            pytest.param(
296                ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420"
297            ),
298            # TOK_FORWARD
299            # TOK_COMMENT
300            pytest.param(
301                ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
302                id="setfib_1",
303                marks=pytest.mark.skip("needs net.fibs>1"),
304            ),
305            pytest.param(
306                ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
307                id="setdscp_42",
308            ),
309            pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
310            pytest.param(
311                ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
312            ),
313        ],
314    )
315    def test_add_action(self, action):
316        """Tests if the rule action is compiled properly"""
317        rule_in = "add {} ip from any to any".format(action[0])
318        rule_out = {"insns": [action[1]]}
319        self.verify_rule(rule_in, rule_out)
320
321    @pytest.mark.parametrize(
322        "insn",
323        [
324            pytest.param(
325                {
326                    "in": "add prob 0.7 allow ip from any to any",
327                    "out": InsnProb(prob=0.7),
328                },
329                id="test_prob",
330            ),
331            pytest.param(
332                {
333                    "in": "add allow tcp from any to any",
334                    "out": InsnProto(arg1=6),
335                },
336                id="test_proto",
337            ),
338            pytest.param(
339                {
340                    "in": "add allow ip from any to any 57",
341                    "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
342                },
343                id="test_ports",
344            ),
345        ],
346    )
347    def test_add_single_instruction(self, insn):
348        """Tests if the compiled rule is sane and matches the spec"""
349
350        # Prepare the desired output
351        out = {
352            "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
353        }
354        self.verify_rule(insn["in"], out)
355
356    @pytest.mark.parametrize(
357        "opcode",
358        [
359            pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
360            pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
361        ],
362    )
363    @pytest.mark.parametrize(
364        "params",
365        [
366            pytest.param(
367                {
368                    "in": "57",
369                    "out": [(57, 57)],
370                },
371                id="test_single",
372            ),
373            pytest.param(
374                {
375                    "in": "57-59",
376                    "out": [(57, 59)],
377                },
378                id="test_range",
379            ),
380            pytest.param(
381                {
382                    "in": "57-59,41",
383                    "out": [(57, 59), (41, 41)],
384                },
385                id="test_ranges",
386            ),
387        ],
388    )
389    def test_add_ports(self, params, opcode):
390        if opcode == IpFwOpcode.O_IP_DSTPORT:
391            txt = "add allow ip from any to any " + params["in"]
392        else:
393            txt = "add allow ip from any " + params["in"] + " to any"
394        out = {
395            "insns": [
396                InsnPorts(opcode, port_pairs=params["out"]),
397                InsnEmpty(IpFwOpcode.O_ACCEPT),
398            ]
399        }
400        self.verify_rule(txt, out)
401