xref: /freebsd/sbin/ipfw/tests/test_add_rule.py (revision 7e1ec25c8b6d4090ab0c1fcac4f048015c216267)
1 import errno
2 import json
3 import os
4 import socket
5 import struct
6 import subprocess
7 import sys
8 from ctypes import c_byte
9 from ctypes import c_char
10 from ctypes import c_int
11 from ctypes import c_long
12 from ctypes import c_uint32
13 from ctypes import c_uint8
14 from ctypes import c_ulong
15 from ctypes import c_ushort
16 from ctypes import sizeof
17 from ctypes import Structure
18 from enum import Enum
19 from typing import Any
20 from typing import Dict
21 from typing import List
22 from typing import NamedTuple
23 from typing import Optional
24 from typing import Union
25 
26 import pytest
27 from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
28 from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
29 from atf_python.sys.netpfil.ipfw.insns import Insn
30 from atf_python.sys.netpfil.ipfw.insns import InsnComment
31 from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
32 from atf_python.sys.netpfil.ipfw.insns import InsnIp
33 from atf_python.sys.netpfil.ipfw.insns import InsnIp6
34 from atf_python.sys.netpfil.ipfw.insns import InsnPorts
35 from atf_python.sys.netpfil.ipfw.insns import InsnProb
36 from atf_python.sys.netpfil.ipfw.insns import InsnProto
37 from atf_python.sys.netpfil.ipfw.insns import InsnReject
38 from atf_python.sys.netpfil.ipfw.insns import InsnTable
39 from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
40 from atf_python.sys.netpfil.ipfw.ioctl import CTlv
41 from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
42 from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
43 from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
44 from atf_python.sys.netpfil.ipfw.ioctl import NTlv
45 from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
46 from atf_python.sys.netpfil.ipfw.ioctl import RawRule
47 from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
48 from atf_python.sys.netpfil.ipfw.utils import enum_from_int
49 from atf_python.utils import BaseTest
50 
51 
52 IPFW_PATH = "/sbin/ipfw"
53 
54 
55 def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
56     if bytes(w_obj) == bytes(g_obj):
57         return True
58     num_objects = 0
59     for i, w_child in enumerate(w_obj.obj_list):
60         if i >= len(g_obj.obj_list):
61             print("MISSING object from chain {}".format(" / ".join(w_stack)))
62             w_child.print_obj()
63             print("==========================")
64             return False
65         g_child = g_obj.obj_list[i]
66         if bytes(w_child) == bytes(g_child):
67             num_objects += 1
68             continue
69         w_stack.append(w_obj.obj_name)
70         g_stack.append(g_obj.obj_name)
71         if not differ(w_child, g_child, w_stack, g_stack):
72             return False
73         break
74     if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
75         g_child = g_obj.obj_list[num_objects]
76         print("EXTRA object from chain {}".format(" / ".join(g_stack)))
77         g_child.print_obj()
78         print("==========================")
79         return False
80     print("OBJECTS DIFFER")
81     print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
82     w_obj.print_obj()
83     w_obj.print_obj_hex()
84     print("==========================")
85     print("GOT CHAIN: {}".format(" / ".join(g_stack)))
86     g_obj.print_obj()
87     g_obj.print_obj_hex()
88     print("==========================")
89     return False
90 
91 
92 class TestAddRule(BaseTest):
93     def compile_rule(self, out):
94         tlvs = []
95         if "objs" in out:
96             tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
97         rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
98         tlvs.append(CTlvRule(obj_list=[rule]))
99         return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
100 
101     def verify_rule(self, in_data: str, out_data):
102         # Prepare the desired output
103         expected = self.compile_rule(out_data)
104 
105         reader = DebugIoReader(IPFW_PATH)
106         ioctls = reader.get_records(in_data)
107         assert len(ioctls) == 1  # Only 1 ioctl request expected
108         got = ioctls[0]
109 
110         if not differ(expected, got):
111             print("=> CMD: {}".format(in_data))
112             print("=> WANTED:")
113             expected.print_obj()
114             print("==========================")
115             print("=> GOT:")
116             got.print_obj()
117             print("==========================")
118         assert bytes(got) == bytes(expected)
119 
120     @pytest.mark.parametrize(
121         "rule",
122         [
123             pytest.param(
124                 {
125                     "in": "add 200 allow ip from any to any",
126                     "out": {
127                         "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
128                         "rulenum": 200,
129                     },
130                 },
131                 id="test_rulenum",
132             ),
133             pytest.param(
134                 {
135                     "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
136                     "out": {
137                         "insns": [
138                             InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
139                             InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
140                             InsnEmpty(IpFwOpcode.O_ACCEPT),
141                         ],
142                     },
143                 },
144                 id="test_or",
145             ),
146             pytest.param(
147                 {
148                     "in": "add allow ip from table(AAA) to table(BBB)",
149                     "out": {
150                         "objs": [
151                             NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
152                             NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
153                         ],
154                         "insns": [
155                             InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1),
156                             InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2),
157                             InsnEmpty(IpFwOpcode.O_ACCEPT),
158                         ],
159                     },
160                 },
161                 id="test_tables",
162             ),
163             pytest.param(
164                 {
165                     "in": "add allow ip from any to 1.2.3.4 // test comment",
166                     "out": {
167                         "insns": [
168                             InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
169                             InsnComment(comment="test comment"),
170                             InsnEmpty(IpFwOpcode.O_ACCEPT),
171                         ],
172                     },
173                 },
174                 id="test_comment",
175             ),
176             pytest.param(
177                 {
178                     "in": "add tcp-setmss 123 ip from any to 1.2.3.4",
179                     "out": {
180                         "objs": [
181                             NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"),
182                         ],
183                         "insns": [
184                             InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
185                             Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1),
186                             Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123),
187                         ],
188                     },
189                 },
190                 id="test_eaction_tcp-setmss",
191             ),
192             pytest.param(
193                 {
194                     "in": "add eaction ntpv6 AAA ip from any to 1.2.3.4",
195                     "out": {
196                         "objs": [
197                             NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="ntpv6"),
198                             NTlv(0, idx=2, name="AAA"),
199                         ],
200                         "insns": [
201                             InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
202                             Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1),
203                             Insn(IpFwOpcode.O_EXTERNAL_INSTANCE, arg1=2),
204                         ],
205                     },
206                 },
207                 id="test_eaction_ntp",
208             ),
209             pytest.param(
210                 {
211                     "in": "add // test comment",
212                     "out": {
213                         "insns": [
214                             InsnComment(comment="test comment"),
215                             Insn(IpFwOpcode.O_COUNT),
216                         ],
217                     },
218                 },
219                 id="test_action_comment",
220             ),
221             pytest.param(
222                 {
223                     "in": "add check-state :OUT // test comment",
224                     "out": {
225                         "objs": [
226                             NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
227                         ],
228                         "insns": [
229                             InsnComment(comment="test comment"),
230                             Insn(IpFwOpcode.O_CHECK_STATE, arg1=1),
231                         ],
232                     },
233                 },
234                 id="test_check_state",
235             ),
236             pytest.param(
237                 {
238                     "in": "add allow tcp from any to any keep-state :OUT",
239                     "out": {
240                         "objs": [
241                             NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
242                         ],
243                         "insns": [
244                             Insn(IpFwOpcode.O_PROBE_STATE, arg1=1),
245                             Insn(IpFwOpcode.O_PROTO, arg1=6),
246                             Insn(IpFwOpcode.O_KEEP_STATE, arg1=1),
247                             InsnEmpty(IpFwOpcode.O_ACCEPT),
248                         ],
249                     },
250                 },
251                 id="test_keep_state",
252             ),
253             pytest.param(
254                 {
255                     "in": "add allow tcp from any to any record-state",
256                     "out": {
257                         "objs": [
258                             NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"),
259                         ],
260                         "insns": [
261                             Insn(IpFwOpcode.O_PROTO, arg1=6),
262                             Insn(IpFwOpcode.O_KEEP_STATE, arg1=1),
263                             InsnEmpty(IpFwOpcode.O_ACCEPT),
264                         ],
265                     },
266                 },
267                 id="test_record_state",
268             ),
269         ],
270     )
271     def test_add_rule(self, rule):
272         """Tests if the compiled rule is sane and matches the spec"""
273         self.verify_rule(rule["in"], rule["out"])
274 
275     @pytest.mark.parametrize(
276         "action",
277         [
278             pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
279             pytest.param(
280                 (
281                     "abort",
282                     Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
283                 ),
284                 id="abort",
285             ),
286             pytest.param(
287                 (
288                     "abort6",
289                     Insn(
290                         IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
291                     ),
292                 ),
293                 id="abort6",
294             ),
295             pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
296             pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
297             pytest.param(
298                 (
299                     "reject",
300                     Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
301                 ),
302                 id="reject",
303             ),
304             pytest.param(
305                 (
306                     "reset",
307                     Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
308                 ),
309                 id="reset",
310             ),
311             pytest.param(
312                 (
313                     "reset6",
314                     Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
315                 ),
316                 id="reset6",
317             ),
318             pytest.param(
319                 (
320                     "unreach port",
321                     InsnReject(
322                         IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
323                     ),
324                 ),
325                 id="unreach_port",
326             ),
327             pytest.param(
328                 (
329                     "unreach port",
330                     InsnReject(
331                         IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
332                     ),
333                 ),
334                 id="unreach_port",
335             ),
336             pytest.param(
337                 (
338                     "unreach needfrag",
339                     InsnReject(
340                         IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
341                     ),
342                 ),
343                 id="unreach_needfrag",
344             ),
345             pytest.param(
346                 (
347                     "unreach needfrag 1420",
348                     InsnReject(
349                         IpFwOpcode.O_REJECT,
350                         arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
351                         mtu=1420,
352                     ),
353                 ),
354                 id="unreach_needfrag_mtu",
355             ),
356             pytest.param(
357                 (
358                     "unreach6 port",
359                     Insn(
360                         IpFwOpcode.O_UNREACH6,
361                         arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
362                     ),
363                 ),
364                 id="unreach6_port",
365             ),
366             pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
367             # TOK_NAT
368             pytest.param(
369                 ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
370             ),
371             pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
372             pytest.param(
373                 ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42"
374             ),
375             pytest.param(
376                 ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
377             ),
378             pytest.param(
379                 ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
380             ),
381             pytest.param(
382                 ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
383             ),
384             pytest.param(
385                 ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
386             ),
387             pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
388             pytest.param(
389                 ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420"
390             ),
391             # TOK_FORWARD
392             pytest.param(
393                 ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
394                 id="setfib_1",
395                 marks=pytest.mark.skip("needs net.fibs>1"),
396             ),
397             pytest.param(
398                 ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
399                 id="setdscp_42",
400             ),
401             pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
402             pytest.param(
403                 ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
404             ),
405         ],
406     )
407     def test_add_action(self, action):
408         """Tests if the rule action is compiled properly"""
409         rule_in = "add {} ip from any to any".format(action[0])
410         rule_out = {"insns": [action[1]]}
411         self.verify_rule(rule_in, rule_out)
412 
413     @pytest.mark.parametrize(
414         "insn",
415         [
416             pytest.param(
417                 {
418                     "in": "add prob 0.7 allow ip from any to any",
419                     "out": InsnProb(prob=0.7),
420                 },
421                 id="test_prob",
422             ),
423             pytest.param(
424                 {
425                     "in": "add allow tcp from any to any",
426                     "out": InsnProto(arg1=6),
427                 },
428                 id="test_proto",
429             ),
430             pytest.param(
431                 {
432                     "in": "add allow ip from any to any 57",
433                     "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
434                 },
435                 id="test_ports",
436             ),
437         ],
438     )
439     def test_add_single_instruction(self, insn):
440         """Tests if the compiled rule is sane and matches the spec"""
441 
442         # Prepare the desired output
443         out = {
444             "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
445         }
446         self.verify_rule(insn["in"], out)
447 
448     @pytest.mark.parametrize(
449         "opcode",
450         [
451             pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
452             pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
453         ],
454     )
455     @pytest.mark.parametrize(
456         "params",
457         [
458             pytest.param(
459                 {
460                     "in": "57",
461                     "out": [(57, 57)],
462                 },
463                 id="test_single",
464             ),
465             pytest.param(
466                 {
467                     "in": "57-59",
468                     "out": [(57, 59)],
469                 },
470                 id="test_range",
471             ),
472             pytest.param(
473                 {
474                     "in": "57-59,41",
475                     "out": [(57, 59), (41, 41)],
476                 },
477                 id="test_ranges",
478             ),
479         ],
480     )
481     def test_add_ports(self, params, opcode):
482         if opcode == IpFwOpcode.O_IP_DSTPORT:
483             txt = "add allow ip from any to any " + params["in"]
484         else:
485             txt = "add allow ip from any " + params["in"] + " to any"
486         out = {
487             "insns": [
488                 InsnPorts(opcode, port_pairs=params["out"]),
489                 InsnEmpty(IpFwOpcode.O_ACCEPT),
490             ]
491         }
492         self.verify_rule(txt, out)
493