xref: /freebsd/sbin/ipfw/tests/test_add_rule.py (revision acd546f01e58354af049455472980c6c4a52e18b)
1import errno
2import json
3import os
4import socket
5import struct
6import subprocess
7import sys
8from ctypes import c_byte
9from ctypes import c_char
10from ctypes import c_int
11from ctypes import c_long
12from ctypes import c_uint32
13from ctypes import c_uint8
14from ctypes import c_ulong
15from ctypes import c_ushort
16from ctypes import sizeof
17from ctypes import Structure
18from enum import Enum
19from typing import Any
20from typing import Dict
21from typing import List
22from typing import NamedTuple
23from typing import Optional
24from typing import Union
25
26import pytest
27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
29from atf_python.sys.netpfil.ipfw.insns import Insn
30from atf_python.sys.netpfil.ipfw.insns import InsnComment
31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
32from atf_python.sys.netpfil.ipfw.insns import InsnIp
33from atf_python.sys.netpfil.ipfw.insns import InsnIp6
34from atf_python.sys.netpfil.ipfw.insns import InsnPorts
35from atf_python.sys.netpfil.ipfw.insns import InsnProb
36from atf_python.sys.netpfil.ipfw.insns import InsnProto
37from atf_python.sys.netpfil.ipfw.insns import InsnReject
38from atf_python.sys.netpfil.ipfw.insns import InsnTable
39from atf_python.sys.netpfil.ipfw.insns import InsnU32
40from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
41from atf_python.sys.netpfil.ipfw.ioctl import CTlv
42from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
43from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
44from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
45from atf_python.sys.netpfil.ipfw.ioctl import NTlv
46from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
47from atf_python.sys.netpfil.ipfw.ioctl import RawRule
48from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
49from atf_python.sys.netpfil.ipfw.utils import enum_from_int
50from atf_python.utils import BaseTest
51
52
53IPFW_PATH = "/sbin/ipfw"
54
55
56def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
57    if bytes(w_obj) == bytes(g_obj):
58        return True
59    num_objects = 0
60    for i, w_child in enumerate(w_obj.obj_list):
61        if i >= len(g_obj.obj_list):
62            print("MISSING object from chain {}".format(" / ".join(w_stack)))
63            w_child.print_obj()
64            print("==========================")
65            return False
66        g_child = g_obj.obj_list[i]
67        if bytes(w_child) == bytes(g_child):
68            num_objects += 1
69            continue
70        w_stack.append(w_obj.obj_name)
71        g_stack.append(g_obj.obj_name)
72        if not differ(w_child, g_child, w_stack, g_stack):
73            return False
74        break
75    if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
76        g_child = g_obj.obj_list[num_objects]
77        print("EXTRA object from chain {}".format(" / ".join(g_stack)))
78        g_child.print_obj()
79        print("==========================")
80        return False
81    print("OBJECTS DIFFER")
82    print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
83    w_obj.print_obj()
84    w_obj.print_obj_hex()
85    print("==========================")
86    print("GOT CHAIN: {}".format(" / ".join(g_stack)))
87    g_obj.print_obj()
88    g_obj.print_obj_hex()
89    print("==========================")
90    return False
91
92
93class TestAddRule(BaseTest):
94    def compile_rule(self, out):
95        tlvs = []
96        if "objs" in out:
97            tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
98        rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
99        tlvs.append(CTlvRule(obj_list=[rule]))
100        return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
101
102    def verify_rule(self, in_data: str, out_data):
103        # Prepare the desired output
104        expected = self.compile_rule(out_data)
105
106        reader = DebugIoReader(IPFW_PATH)
107        ioctls = reader.get_records(in_data)
108        assert len(ioctls) == 1  # Only 1 ioctl request expected
109        got = ioctls[0]
110
111        if not differ(expected, got):
112            print("=> CMD: {}".format(in_data))
113            print("=> WANTED:")
114            expected.print_obj()
115            print("==========================")
116            print("=> GOT:")
117            got.print_obj()
118            print("==========================")
119        assert bytes(got) == bytes(expected)
120
121    @pytest.mark.parametrize(
122        "rule",
123        [
124            pytest.param(
125                {
126                    "in": "add 200 allow ip from any to any",
127                    "out": {
128                        "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
129                        "rulenum": 200,
130                    },
131                },
132                id="test_rulenum",
133            ),
134            pytest.param(
135                {
136                    "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
137                    "out": {
138                        "insns": [
139                            InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
140                            InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
141                            InsnEmpty(IpFwOpcode.O_ACCEPT),
142                        ],
143                    },
144                },
145                id="test_or",
146            ),
147            pytest.param(
148                {
149                    "in": "add allow ip from table(AAA) to table(BBB)",
150                    "out": {
151                        "objs": [
152                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
153                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
154                        ],
155                        "insns": [
156                            InsnU32(IpFwOpcode.O_IP_SRC_LOOKUP, u32=1),
157                            InsnU32(IpFwOpcode.O_IP_DST_LOOKUP, u32=2),
158                            InsnEmpty(IpFwOpcode.O_ACCEPT),
159                        ],
160                    },
161                },
162                id="test_tables",
163            ),
164            pytest.param(
165                {
166                    "in": "add allow ip from any to 1.2.3.4 // test comment",
167                    "out": {
168                        "insns": [
169                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
170                            InsnComment(comment="test comment"),
171                            InsnEmpty(IpFwOpcode.O_ACCEPT),
172                        ],
173                    },
174                },
175                id="test_comment",
176            ),
177            pytest.param(
178                {
179                    "in": "add tcp-setmss 123 ip from any to 1.2.3.4",
180                    "out": {
181                        "objs": [
182                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"),
183                        ],
184                        "insns": [
185                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
186                            InsnU32(IpFwOpcode.O_EXTERNAL_ACTION, u32=1),
187                            Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123),
188                        ],
189                    },
190                },
191                id="test_eaction_tcp-setmss",
192            ),
193            pytest.param(
194                {
195                    "in": "add eaction ntpv6 AAA ip from any to 1.2.3.4",
196                    "out": {
197                        "objs": [
198                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="ntpv6"),
199                            NTlv(0, idx=2, name="AAA"),
200                        ],
201                        "insns": [
202                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
203                            InsnU32(IpFwOpcode.O_EXTERNAL_ACTION, u32=1),
204                            InsnU32(IpFwOpcode.O_EXTERNAL_INSTANCE, u32=2),
205                        ],
206                    },
207                },
208                id="test_eaction_ntp",
209            ),
210            pytest.param(
211                {
212                    "in": "add // test comment",
213                    "out": {
214                        "insns": [
215                            InsnComment(comment="test comment"),
216                            Insn(IpFwOpcode.O_COUNT),
217                        ],
218                    },
219                },
220                id="test_action_comment",
221            ),
222            pytest.param(
223                {
224                    "in": "add check-state :OUT // test comment",
225                    "out": {
226                        "objs": [
227                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
228                        ],
229                        "insns": [
230                            InsnComment(comment="test comment"),
231                            InsnU32(IpFwOpcode.O_CHECK_STATE, u32=1),
232                        ],
233                    },
234                },
235                id="test_check_state",
236            ),
237            pytest.param(
238                {
239                    "in": "add allow tcp from any to any keep-state :OUT",
240                    "out": {
241                        "objs": [
242                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
243                        ],
244                        "insns": [
245                            InsnU32(IpFwOpcode.O_PROBE_STATE, u32=1),
246                            Insn(IpFwOpcode.O_PROTO, arg1=6),
247                            InsnU32(IpFwOpcode.O_KEEP_STATE, u32=1),
248                            InsnEmpty(IpFwOpcode.O_ACCEPT),
249                        ],
250                    },
251                },
252                id="test_keep_state",
253            ),
254            pytest.param(
255                {
256                    "in": "add allow tcp from any to any record-state",
257                    "out": {
258                        "objs": [
259                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"),
260                        ],
261                        "insns": [
262                            Insn(IpFwOpcode.O_PROTO, arg1=6),
263                            InsnU32(IpFwOpcode.O_KEEP_STATE, u32=1),
264                            InsnEmpty(IpFwOpcode.O_ACCEPT),
265                        ],
266                    },
267                },
268                id="test_record_state",
269            ),
270        ],
271    )
272    def test_add_rule(self, rule):
273        """Tests if the compiled rule is sane and matches the spec"""
274        self.verify_rule(rule["in"], rule["out"])
275
276    @pytest.mark.parametrize(
277        "action",
278        [
279            pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
280            pytest.param(
281                (
282                    "abort",
283                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
284                ),
285                id="abort",
286            ),
287            pytest.param(
288                (
289                    "abort6",
290                    Insn(
291                        IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
292                    ),
293                ),
294                id="abort6",
295            ),
296            pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
297            pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
298            pytest.param(
299                (
300                    "reject",
301                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
302                ),
303                id="reject",
304            ),
305            pytest.param(
306                (
307                    "reset",
308                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
309                ),
310                id="reset",
311            ),
312            pytest.param(
313                (
314                    "reset6",
315                    Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
316                ),
317                id="reset6",
318            ),
319            pytest.param(
320                (
321                    "unreach port",
322                    InsnReject(
323                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
324                    ),
325                ),
326                id="unreach_port",
327            ),
328            pytest.param(
329                (
330                    "unreach port",
331                    InsnReject(
332                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
333                    ),
334                ),
335                id="unreach_port",
336            ),
337            pytest.param(
338                (
339                    "unreach needfrag",
340                    InsnReject(
341                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
342                    ),
343                ),
344                id="unreach_needfrag",
345            ),
346            pytest.param(
347                (
348                    "unreach needfrag 1420",
349                    InsnReject(
350                        IpFwOpcode.O_REJECT,
351                        arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
352                        mtu=1420,
353                    ),
354                ),
355                id="unreach_needfrag_mtu",
356            ),
357            pytest.param(
358                (
359                    "unreach6 port",
360                    Insn(
361                        IpFwOpcode.O_UNREACH6,
362                        arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
363                    ),
364                ),
365                id="unreach6_port",
366            ),
367            pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
368            # TOK_NAT
369            pytest.param(
370                ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
371            ),
372            pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
373            pytest.param(
374                ("skipto 42", InsnU32(IpFwOpcode.O_SKIPTO, u32=42)), id="skipto_42"
375            ),
376            pytest.param(
377                ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
378            ),
379            pytest.param(
380                ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
381            ),
382            pytest.param(
383                ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
384            ),
385            pytest.param(
386                ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
387            ),
388            pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
389            pytest.param(
390                ("call 420", InsnU32(IpFwOpcode.O_CALLRETURN, u32=420)), id="call_420"
391            ),
392            # TOK_FORWARD
393            pytest.param(
394                ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
395                id="setfib_1",
396                marks=pytest.mark.skip("needs net.fibs>1"),
397            ),
398            pytest.param(
399                ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
400                id="setdscp_42",
401            ),
402            pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
403            pytest.param(
404                ("return", InsnU32(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
405            ),
406        ],
407    )
408    def test_add_action(self, action):
409        """Tests if the rule action is compiled properly"""
410        rule_in = "add {} ip from any to any".format(action[0])
411        rule_out = {"insns": [action[1]]}
412        self.verify_rule(rule_in, rule_out)
413
414    @pytest.mark.parametrize(
415        "insn",
416        [
417            pytest.param(
418                {
419                    "in": "add prob 0.7 allow ip from any to any",
420                    "out": InsnProb(prob=0.7),
421                },
422                id="test_prob",
423            ),
424            pytest.param(
425                {
426                    "in": "add allow tcp from any to any",
427                    "out": InsnProto(arg1=6),
428                },
429                id="test_proto",
430            ),
431            pytest.param(
432                {
433                    "in": "add allow ip from any to any 57",
434                    "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
435                },
436                id="test_ports",
437            ),
438        ],
439    )
440    def test_add_single_instruction(self, insn):
441        """Tests if the compiled rule is sane and matches the spec"""
442
443        # Prepare the desired output
444        out = {
445            "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
446        }
447        self.verify_rule(insn["in"], out)
448
449    @pytest.mark.parametrize(
450        "opcode",
451        [
452            pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
453            pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
454        ],
455    )
456    @pytest.mark.parametrize(
457        "params",
458        [
459            pytest.param(
460                {
461                    "in": "57",
462                    "out": [(57, 57)],
463                },
464                id="test_single",
465            ),
466            pytest.param(
467                {
468                    "in": "57-59",
469                    "out": [(57, 59)],
470                },
471                id="test_range",
472            ),
473            pytest.param(
474                {
475                    "in": "57-59,41",
476                    "out": [(57, 59), (41, 41)],
477                },
478                id="test_ranges",
479            ),
480        ],
481    )
482    def test_add_ports(self, params, opcode):
483        if opcode == IpFwOpcode.O_IP_DSTPORT:
484            txt = "add allow ip from any to any " + params["in"]
485        else:
486            txt = "add allow ip from any " + params["in"] + " to any"
487        out = {
488            "insns": [
489                InsnPorts(opcode, port_pairs=params["out"]),
490                InsnEmpty(IpFwOpcode.O_ACCEPT),
491            ]
492        }
493        self.verify_rule(txt, out)
494