xref: /freebsd/sbin/ipfw/tests/test_add_rule.py (revision 2872268c7f6d473aae9b02ebb5d2c24fc2cff9b1)
1import errno
2import json
3import os
4import socket
5import struct
6import subprocess
7import sys
8from ctypes import c_byte
9from ctypes import c_char
10from ctypes import c_int
11from ctypes import c_long
12from ctypes import c_uint32
13from ctypes import c_uint8
14from ctypes import c_ulong
15from ctypes import c_ushort
16from ctypes import sizeof
17from ctypes import Structure
18from enum import Enum
19from typing import Any
20from typing import Dict
21from typing import List
22from typing import NamedTuple
23from typing import Optional
24from typing import Union
25
26import pytest
27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
29from atf_python.sys.netpfil.ipfw.insns import Insn
30from atf_python.sys.netpfil.ipfw.insns import InsnComment
31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
32from atf_python.sys.netpfil.ipfw.insns import InsnIp
33from atf_python.sys.netpfil.ipfw.insns import InsnIp6
34from atf_python.sys.netpfil.ipfw.insns import InsnPorts
35from atf_python.sys.netpfil.ipfw.insns import InsnProb
36from atf_python.sys.netpfil.ipfw.insns import InsnProto
37from atf_python.sys.netpfil.ipfw.insns import InsnReject
38from atf_python.sys.netpfil.ipfw.insns import InsnTable
39from atf_python.sys.netpfil.ipfw.insns import InsnU32
40from atf_python.sys.netpfil.ipfw.insns import InsnKidx
41from atf_python.sys.netpfil.ipfw.insns import InsnLookup
42from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
43from atf_python.sys.netpfil.ipfw.insn_headers import IpFwTableLookupType
44from atf_python.sys.netpfil.ipfw.insn_headers import IpFwTableValueType
45from atf_python.sys.netpfil.ipfw.ioctl import CTlv
46from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
47from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
48from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
49from atf_python.sys.netpfil.ipfw.ioctl import NTlv
50from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
51from atf_python.sys.netpfil.ipfw.ioctl import RawRule
52from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
53from atf_python.sys.netpfil.ipfw.utils import enum_from_int
54from atf_python.utils import BaseTest
55
56
57IPFW_PATH = "/sbin/ipfw"
58
59
60def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
61    if bytes(w_obj) == bytes(g_obj):
62        return True
63    num_objects = 0
64    for i, w_child in enumerate(w_obj.obj_list):
65        if i >= len(g_obj.obj_list):
66            print("MISSING object from chain {}".format(" / ".join(w_stack)))
67            w_child.print_obj()
68            print("==========================")
69            return False
70        g_child = g_obj.obj_list[i]
71        if bytes(w_child) == bytes(g_child):
72            num_objects += 1
73            continue
74        w_stack.append(w_obj.obj_name)
75        g_stack.append(g_obj.obj_name)
76        if not differ(w_child, g_child, w_stack, g_stack):
77            return False
78        break
79    if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
80        g_child = g_obj.obj_list[num_objects]
81        print("EXTRA object from chain {}".format(" / ".join(g_stack)))
82        g_child.print_obj()
83        print("==========================")
84        return False
85    print("OBJECTS DIFFER")
86    print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
87    w_obj.print_obj()
88    w_obj.print_obj_hex()
89    print("==========================")
90    print("GOT CHAIN: {}".format(" / ".join(g_stack)))
91    g_obj.print_obj()
92    g_obj.print_obj_hex()
93    print("==========================")
94    return False
95
96
97class TestAddRule(BaseTest):
98    def compile_rule(self, out):
99        tlvs = []
100        if "objs" in out:
101            tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
102        rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
103        tlvs.append(CTlvRule(obj_list=[rule]))
104        return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
105
106    def verify_rule(self, in_data: str, out_data):
107        # Prepare the desired output
108        expected = self.compile_rule(out_data)
109
110        reader = DebugIoReader(IPFW_PATH)
111        ioctls = reader.get_records(in_data)
112        assert len(ioctls) == 1  # Only 1 ioctl request expected
113        got = ioctls[0]
114
115        if not differ(expected, got):
116            print("=> CMD: {}".format(in_data))
117            print("=> WANTED:")
118            expected.print_obj()
119            print("==========================")
120            print("=> GOT:")
121            got.print_obj()
122            print("==========================")
123        assert bytes(got) == bytes(expected)
124
125    @pytest.mark.parametrize(
126        "rule",
127        [
128            pytest.param(
129                {
130                    "in": "add 200 allow ip from any to any",
131                    "out": {
132                        "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
133                        "rulenum": 200,
134                    },
135                },
136                id="test_rulenum",
137            ),
138            pytest.param(
139                {
140                    "in": "add allow ip4 from 0.0.0.0/0 to 192.0.2.1/0",
141                    "out": {
142                        "insns": [
143                            InsnEmpty(IpFwOpcode.O_IP4),
144                            InsnEmpty(IpFwOpcode.O_ACCEPT),
145                        ],
146                    },
147                },
148                id="test_zero_addrmask4",
149            ),
150            pytest.param(
151                {
152                    "in": "add allow ip6 from ::/0 to 2001:DB8::/0",
153                    "out": {
154                        "insns": [
155                            InsnEmpty(IpFwOpcode.O_IP6),
156                            InsnEmpty(IpFwOpcode.O_ACCEPT),
157                        ],
158                    },
159                },
160                id="test_zero_addrmask6",
161            ),
162            pytest.param(
163                {
164                    "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
165                    "out": {
166                        "insns": [
167                            InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
168                            InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
169                            InsnEmpty(IpFwOpcode.O_ACCEPT),
170                        ],
171                    },
172                },
173                id="test_or",
174            ),
175            pytest.param(
176                {
177                    "in": "add allow ip from table(AAA) to table(BBB)",
178                    "out": {
179                        "objs": [
180                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
181                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
182                        ],
183                        "insns": [
184                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
185                            InsnKidx(IpFwOpcode.O_IP_DST_LOOKUP, kidx=2),
186                            InsnEmpty(IpFwOpcode.O_ACCEPT),
187                        ],
188                    },
189                },
190                id="test_tables",
191            ),
192            pytest.param(
193                {
194                    "in": "add allow ip from table(AAA) to any lookup dst-ip BBB",
195                    "out": {
196                        "objs": [
197                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
198                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
199                        ],
200                        "insns": [
201                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
202                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
203                                kidx=2,
204                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP),
205                                ),
206                            InsnEmpty(IpFwOpcode.O_ACCEPT),
207                        ],
208                    },
209                },
210                id="test_tables_lookup_no_mask",
211            ),
212            pytest.param(
213                {
214                    "in": "add allow ip from table(AAA) to any lookup mark:0xf00baa1 BBB",
215                    "out": {
216                        "objs": [
217                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
218                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
219                        ],
220                        "insns": [
221                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
222                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
223                                kidx=2,
224                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_MARK, bitmask=True),
225                                bitmask=0xf00baa1),
226                            InsnEmpty(IpFwOpcode.O_ACCEPT),
227                        ],
228                    },
229                },
230                id="test_tables_lookup_u32_mask",
231            ),
232            pytest.param(
233                {
234                    "in": "add allow ip from table(AAA) to any lookup src-mac:1a:2b:3c:4d:5e:6f BBB",
235                    "out": {
236                        "objs": [
237                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
238                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
239                        ],
240                        "insns": [
241                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
242                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
243                                kidx=2,
244                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_SRC_MAC, bitmask=True),
245                                bitmask="1a:2b:3c:4d:5e:6f"),
246                            InsnEmpty(IpFwOpcode.O_ACCEPT),
247                        ],
248                    },
249                },
250                id="test_tables_lookup_mac_mask",
251            ),
252            pytest.param(
253                {
254                    "in": "add allow ip from table(AAA) to any lookup dst-ip4:1715004 BBB",
255                    "out": {
256                        "objs": [
257                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
258                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
259                        ],
260                        "insns": [
261                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
262                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
263                                kidx=2,
264                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP4, bitmask=True),
265                                bitmask="60.43.26.0"),
266                            InsnEmpty(IpFwOpcode.O_ACCEPT),
267                        ],
268                    },
269                },
270                id="test_tables_lookup_dst_ip4_numeric",
271            ),
272            pytest.param(
273                {
274                    "in": "add allow ip from table(AAA) to any lookup src-ip4:0.0.0.255 BBB",
275                    "out": {
276                        "objs": [
277                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
278                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
279                        ],
280                        "insns": [
281                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
282                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
283                                kidx=2,
284                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_SRC_IP4, bitmask=True),
285                                bitmask="0.0.0.255"),
286                            InsnEmpty(IpFwOpcode.O_ACCEPT),
287                        ],
288                    },
289                },
290                id="test_tables_lookup_src_ip4_addr",
291            ),
292            pytest.param(
293                {
294                    "in": "add allow ip from table(AAA) to any lookup jail:0.0.252.128 BBB",
295                    "out": {
296                        "objs": [
297                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
298                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
299                        ],
300                        "insns": [
301                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
302                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
303                                kidx=2,
304                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_JAIL, bitmask=True),
305                                bitmask=64640),
306                            InsnEmpty(IpFwOpcode.O_ACCEPT),
307                        ],
308                    },
309                },
310                id="test_tables_lookup_jail_addr",
311            ),
312            pytest.param(
313                {
314                    "in": "add allow ip from table(AAA) to any lookup dst-ip6:ffff:ffff:f00:baaa:b00c:: BBB",
315                    "out": {
316                        "objs": [
317                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
318                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
319                        ],
320                        "insns": [
321                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
322                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
323                                kidx=2,
324                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP6, bitmask=True),
325                                bitmask="ffff:ffff:f00:baaa:b00c::"),
326                            InsnEmpty(IpFwOpcode.O_ACCEPT),
327                        ],
328                    },
329                },
330                id="test_tables_lookup_dst_ip6",
331            ),
332            pytest.param(
333                {
334                    "in": "add allow ip from table(AAA,16777215) to any",
335                    "out": {
336                        "objs": [
337                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
338                        ],
339                        "insns": [
340                            InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP,
341                                kidx=1,
342                                arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_TAG),
343                                value=16777215),
344                            InsnEmpty(IpFwOpcode.O_ACCEPT),
345                        ],
346                    },
347                },
348                id="test_tables_check_value_legacy",
349            ),
350            pytest.param(
351                {
352                    "in": "add allow ip from table(AAA,nat=1231) to any",
353                    "out": {
354                        "objs": [
355                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
356                        ],
357                        "insns": [
358                            InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP,
359                                kidx=1,
360                                arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NAT),
361                                value=1231),
362                            InsnEmpty(IpFwOpcode.O_ACCEPT),
363                        ],
364                    },
365                },
366                id="test_tables_check_value_nat",
367            ),
368            pytest.param(
369                {
370                    "in": "add allow ip from table(AAA,nh4=10.20.30.40) to any",
371                    "out": {
372                        "objs": [
373                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
374                        ],
375                        "insns": [
376                            InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP,
377                                kidx=1,
378                                arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NH4),
379                                value="10.20.30.40"),
380                            InsnEmpty(IpFwOpcode.O_ACCEPT),
381                        ],
382                    },
383                },
384                id="test_tables_check_value_nh4",
385            ),
386            pytest.param(
387                {
388                    "in": "add allow ip from table(AAA,nh6=ff02:1234:b00c::abcd) to any",
389                    "out": {
390                        "objs": [
391                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
392                        ],
393                        "insns": [
394                            InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP,
395                                kidx=1,
396                                arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NH6),
397                                value="ff02:1234:b00c::abcd"),
398                            InsnEmpty(IpFwOpcode.O_ACCEPT),
399                        ],
400                    },
401                },
402                id="test_tables_check_value_nh6",
403            ),
404            pytest.param(
405                {
406                    "in": "add allow ip from any to 1.2.3.4 // test comment",
407                    "out": {
408                        "insns": [
409                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
410                            InsnComment(comment="test comment"),
411                            InsnEmpty(IpFwOpcode.O_ACCEPT),
412                        ],
413                    },
414                },
415                id="test_comment",
416            ),
417            pytest.param(
418                {
419                    "in": "add tcp-setmss 123 ip from any to 1.2.3.4",
420                    "out": {
421                        "objs": [
422                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"),
423                        ],
424                        "insns": [
425                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
426                            InsnKidx(IpFwOpcode.O_EXTERNAL_ACTION, kidx=1),
427                            Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123),
428                        ],
429                    },
430                },
431                id="test_eaction_tcp-setmss",
432            ),
433            pytest.param(
434                {
435                    "in": "add eaction nptv6 AAA ip from any to 1.2.3.4",
436                    "out": {
437                        "objs": [
438                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="nptv6"),
439                            NTlv(0, idx=2, name="AAA"),
440                        ],
441                        "insns": [
442                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
443                            InsnKidx(IpFwOpcode.O_EXTERNAL_ACTION, kidx=1),
444                            InsnKidx(IpFwOpcode.O_EXTERNAL_INSTANCE, kidx=2),
445                        ],
446                    },
447                },
448                id="test_eaction_nptv6",
449            ),
450            pytest.param(
451                {
452                    "in": "add // test comment",
453                    "out": {
454                        "insns": [
455                            InsnComment(comment="test comment"),
456                            Insn(IpFwOpcode.O_COUNT),
457                        ],
458                    },
459                },
460                id="test_action_comment",
461            ),
462            pytest.param(
463                {
464                    "in": "add check-state :OUT // test comment",
465                    "out": {
466                        "objs": [
467                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
468                        ],
469                        "insns": [
470                            InsnComment(comment="test comment"),
471                            InsnKidx(IpFwOpcode.O_CHECK_STATE, kidx=1),
472                        ],
473                    },
474                },
475                id="test_check_state",
476            ),
477            pytest.param(
478                {
479                    "in": "add allow tcp from any to any keep-state :OUT",
480                    "out": {
481                        "objs": [
482                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
483                        ],
484                        "insns": [
485                            InsnKidx(IpFwOpcode.O_PROBE_STATE, kidx=1),
486                            Insn(IpFwOpcode.O_PROTO, arg1=6),
487                            InsnKidx(IpFwOpcode.O_KEEP_STATE, kidx=1),
488                            InsnEmpty(IpFwOpcode.O_ACCEPT),
489                        ],
490                    },
491                },
492                id="test_keep_state",
493            ),
494            pytest.param(
495                {
496                    "in": "add allow tcp from any to any record-state",
497                    "out": {
498                        "objs": [
499                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"),
500                        ],
501                        "insns": [
502                            Insn(IpFwOpcode.O_PROTO, arg1=6),
503                            InsnKidx(IpFwOpcode.O_KEEP_STATE, kidx=1),
504                            InsnEmpty(IpFwOpcode.O_ACCEPT),
505                        ],
506                    },
507                },
508                id="test_record_state",
509            ),
510        ],
511    )
512    def test_add_rule(self, rule):
513        """Tests if the compiled rule is sane and matches the spec"""
514        self.verify_rule(rule["in"], rule["out"])
515
516    @pytest.mark.parametrize(
517        "action",
518        [
519            pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
520            pytest.param(
521                (
522                    "abort",
523                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
524                ),
525                id="abort",
526            ),
527            pytest.param(
528                (
529                    "abort6",
530                    Insn(
531                        IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
532                    ),
533                ),
534                id="abort6",
535            ),
536            pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
537            pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
538            pytest.param(
539                (
540                    "reject",
541                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
542                ),
543                id="reject",
544            ),
545            pytest.param(
546                (
547                    "reset",
548                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
549                ),
550                id="reset",
551            ),
552            pytest.param(
553                (
554                    "reset6",
555                    Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
556                ),
557                id="reset6",
558            ),
559            pytest.param(
560                (
561                    "unreach port",
562                    InsnReject(
563                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
564                    ),
565                ),
566                id="unreach_port",
567            ),
568            pytest.param(
569                (
570                    "unreach port",
571                    InsnReject(
572                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
573                    ),
574                ),
575                id="unreach_port",
576            ),
577            pytest.param(
578                (
579                    "unreach needfrag",
580                    InsnReject(
581                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
582                    ),
583                ),
584                id="unreach_needfrag",
585            ),
586            pytest.param(
587                (
588                    "unreach needfrag 1420",
589                    InsnReject(
590                        IpFwOpcode.O_REJECT,
591                        arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
592                        mtu=1420,
593                    ),
594                ),
595                id="unreach_needfrag_mtu",
596            ),
597            pytest.param(
598                (
599                    "unreach6 port",
600                    Insn(
601                        IpFwOpcode.O_UNREACH6,
602                        arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
603                    ),
604                ),
605                id="unreach6_port",
606            ),
607            pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
608            # TOK_NAT
609            pytest.param(
610                ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
611            ),
612            pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
613            pytest.param(
614                ("skipto 42", InsnU32(IpFwOpcode.O_SKIPTO, u32=42)), id="skipto_42"
615            ),
616            pytest.param(
617                ("skipto 4200", InsnU32(IpFwOpcode.O_SKIPTO, u32=4200)), id="skipto_4200"
618            ),
619            pytest.param(
620                ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
621            ),
622            pytest.param(
623                ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
624            ),
625            pytest.param(
626                ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
627            ),
628            pytest.param(
629                ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
630            ),
631            pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
632            pytest.param(
633                ("call 420", InsnU32(IpFwOpcode.O_CALLRETURN, u32=420)), id="call_420"
634            ),
635            # TOK_FORWARD
636            pytest.param(
637                ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
638                id="setfib_1",
639                marks=pytest.mark.skip("needs net.fibs>1"),
640            ),
641            pytest.param(
642                ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
643                id="setdscp_42",
644            ),
645            pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
646            pytest.param(
647                ("return", InsnU32(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
648            ),
649        ],
650    )
651    def test_add_action(self, action):
652        """Tests if the rule action is compiled properly"""
653        rule_in = "add {} ip from any to any".format(action[0])
654        rule_out = {"insns": [action[1]]}
655        self.verify_rule(rule_in, rule_out)
656
657    @pytest.mark.parametrize(
658        "insn",
659        [
660            pytest.param(
661                {
662                    "in": "add prob 0.7 allow ip from any to any",
663                    "out": InsnProb(prob=0.7),
664                },
665                id="test_prob",
666            ),
667            pytest.param(
668                {
669                    "in": "add allow tcp from any to any",
670                    "out": InsnProto(arg1=6),
671                },
672                id="test_proto",
673            ),
674            pytest.param(
675                {
676                    "in": "add allow ip from any to any 57",
677                    "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
678                },
679                id="test_ports",
680            ),
681        ],
682    )
683    def test_add_single_instruction(self, insn):
684        """Tests if the compiled rule is sane and matches the spec"""
685
686        # Prepare the desired output
687        out = {
688            "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
689        }
690        self.verify_rule(insn["in"], out)
691
692    @pytest.mark.parametrize(
693        "opcode",
694        [
695            pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
696            pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
697        ],
698    )
699    @pytest.mark.parametrize(
700        "params",
701        [
702            pytest.param(
703                {
704                    "in": "57",
705                    "out": [(57, 57)],
706                },
707                id="test_single",
708            ),
709            pytest.param(
710                {
711                    "in": "57-59",
712                    "out": [(57, 59)],
713                },
714                id="test_range",
715            ),
716            pytest.param(
717                {
718                    "in": "57-59,41",
719                    "out": [(57, 59), (41, 41)],
720                },
721                id="test_ranges",
722            ),
723        ],
724    )
725    def test_add_ports(self, params, opcode):
726        if opcode == IpFwOpcode.O_IP_DSTPORT:
727            txt = "add allow ip from any to any " + params["in"]
728        else:
729            txt = "add allow ip from any " + params["in"] + " to any"
730        out = {
731            "insns": [
732                InsnPorts(opcode, port_pairs=params["out"]),
733                InsnEmpty(IpFwOpcode.O_ACCEPT),
734            ]
735        }
736        self.verify_rule(txt, out)
737