1#!/usr/bin/env python3 2import os 3import socket 4import struct 5import subprocess 6import sys 7from ctypes import c_byte 8from ctypes import c_char 9from ctypes import c_int 10from ctypes import c_long 11from ctypes import c_uint32 12from ctypes import c_uint8 13from ctypes import c_ulong 14from ctypes import c_ushort 15from ctypes import sizeof 16from ctypes import Structure 17from enum import Enum 18from typing import Any 19from typing import Dict 20from typing import List 21from typing import NamedTuple 22from typing import Optional 23from typing import Union 24 25from atf_python.sys.netpfil.ipfw.ioctl import get3_classes 26from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes 27from atf_python.sys.netpfil.ipfw.ioctl import set3_classes 28from atf_python.sys.netpfil.ipfw.utils import AttrDescr 29from atf_python.sys.netpfil.ipfw.utils import enum_from_int 30from atf_python.sys.netpfil.ipfw.utils import enum_or_int 31from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map 32 33 34class DebugHeader(Structure): 35 _fields_ = [ 36 ("cmd_type", c_ushort), 37 ("spare1", c_ushort), 38 ("opt_name", c_uint32), 39 ("total_len", c_uint32), 40 ("spare2", c_uint32), 41 ] 42 43 44class DebugType(Enum): 45 DO_CMD = 1 46 DO_SET3 = 2 47 DO_GET3 = 3 48 49 50class DebugIoReader(object): 51 HANDLER_CLASSES = { 52 DebugType.DO_CMD: legacy_classes, 53 DebugType.DO_SET3: set3_classes, 54 DebugType.DO_GET3: get3_classes, 55 } 56 57 def __init__(self, ipfw_path): 58 self._msgmap = self.build_msgmap() 59 self.ipfw_path = ipfw_path 60 61 def build_msgmap(self): 62 xmap = {} 63 for debug_type, handler_classes in self.HANDLER_CLASSES.items(): 64 debug_type = enum_or_int(debug_type) 65 if debug_type not in xmap: 66 xmap[debug_type] = {} 67 for handler_class in handler_classes: 68 for msg in handler_class.messages: 69 xmap[debug_type][enum_or_int(msg)] = handler_class 70 return xmap 71 72 def print_obj_header(self, hdr): 73 debug_type = "#{}".format(hdr.cmd_type) 74 for _type in self.HANDLER_CLASSES.keys(): 75 if _type.value == hdr.cmd_type: 76 debug_type = _type.name.lower() 77 break 78 print( 79 "@@ record for {} len={} optname={}".format( 80 debug_type, hdr.total_len, hdr.opt_name 81 ) 82 ) 83 84 def parse_record(self, data): 85 hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)]) 86 data = data[sizeof(DebugHeader) :] 87 cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name) 88 if cls is not None: 89 return cls.from_bytes(data) 90 raise ValueError( 91 "unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name) 92 ) 93 94 def get_record_from_stdin(self): 95 data = sys.stdin.buffer.peek(sizeof(DebugHeader)) 96 if len(data) == 0: 97 return None 98 99 hdr = DebugHeader.from_buffer_copy(data) 100 data = sys.stdin.buffer.read(hdr.total_len) 101 return self.parse_record(data) 102 103 def get_records_from_buffer(self, data): 104 off = 0 105 ret = [] 106 while off + sizeof(DebugHeader) <= len(data): 107 hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)]) 108 ret.append(self.parse_record(data[off : off + hdr.total_len])) 109 off += hdr.total_len 110 return ret 111 112 def run_ipfw(self, cmd: str) -> bytes: 113 args = [self.ipfw_path, "-xqn"] + cmd.split() 114 r = subprocess.run(args, capture_output=True) 115 return r.stdout 116 117 def get_records(self, cmd: str): 118 return self.get_records_from_buffer(self.run_ipfw(cmd)) 119