1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 3 4""" 5YNL cli tool 6""" 7 8import argparse 9import json 10import os 11import pathlib 12import pprint 13import shutil 14import sys 15import textwrap 16 17# pylint: disable=no-name-in-module,wrong-import-position 18sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix()) 19from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException 20 21SYS_SCHEMA_DIR='/usr/share/ynl' 22RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink' 23 24# pylint: disable=too-few-public-methods,too-many-locals 25class Colors: 26 """ANSI color and font modifier codes""" 27 RESET = '\033[0m' 28 29 BOLD = '\033[1m' 30 ITALICS = '\033[3m' 31 UNDERLINE = '\033[4m' 32 INVERT = '\033[7m' 33 34 35def color(text, modifiers): 36 """Add color to text if output is a TTY 37 38 Returns: 39 Colored text if stdout is a TTY, otherwise plain text 40 """ 41 if sys.stdout.isatty(): 42 # Join the colors if they are a list, if it's a string this a noop 43 modifiers = "".join(modifiers) 44 return f"{modifiers}{text}{Colors.RESET}" 45 return text 46 47def term_width(): 48 """ Get terminal width in columns (80 if stdout is not a terminal) """ 49 return shutil.get_terminal_size().columns 50 51def schema_dir(): 52 """ 53 Return the effective schema directory, preferring in-tree before 54 system schema directory. 55 """ 56 script_dir = os.path.dirname(os.path.abspath(__file__)) 57 schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}") 58 if not os.path.isdir(schema_dir_): 59 schema_dir_ = SYS_SCHEMA_DIR 60 if not os.path.isdir(schema_dir_): 61 raise YnlException(f"Schema directory {schema_dir_} does not exist") 62 return schema_dir_ 63 64def spec_dir(): 65 """ 66 Return the effective spec directory, relative to the effective 67 schema directory. 68 """ 69 spec_dir_ = schema_dir() + '/specs' 70 if not os.path.isdir(spec_dir_): 71 raise YnlException(f"Spec directory {spec_dir_} does not exist") 72 return spec_dir_ 73 74 75class YnlEncoder(json.JSONEncoder): 76 """A custom encoder for emitting JSON with ynl-specific instance types""" 77 def default(self, o): 78 if isinstance(o, bytes): 79 return bytes.hex(o) 80 if isinstance(o, set): 81 return list(o) 82 return json.JSONEncoder.default(self, o) 83 84 85def print_attr_list(ynl, attr_names, attr_set, indent=2): 86 """Print a list of attributes with their types and documentation.""" 87 prefix = ' ' * indent 88 for attr_name in attr_names: 89 if attr_name in attr_set.attrs: 90 attr = attr_set.attrs[attr_name] 91 attr_info = f'{prefix}- {color(attr_name, Colors.BOLD)}: {attr.type}' 92 if 'enum' in attr.yaml: 93 enum_name = attr.yaml['enum'] 94 attr_info += f" (enum: {enum_name})" 95 # Print enum values if available 96 if enum_name in ynl.consts: 97 const = ynl.consts[enum_name] 98 enum_values = list(const.entries.keys()) 99 type_fmted = color(const.type.capitalize(), Colors.ITALICS) 100 attr_info += f"\n{prefix} {type_fmted}: {', '.join(enum_values)}" 101 102 # Show nested attributes reference and recursively display them 103 nested_set_name = None 104 if attr.type == 'nest' and 'nested-attributes' in attr.yaml: 105 nested_set_name = attr.yaml['nested-attributes'] 106 attr_info += f" -> {nested_set_name}" 107 108 if attr.yaml.get('doc'): 109 doc_prefix = prefix + ' ' * 4 110 doc_text = textwrap.fill(attr.yaml['doc'], width=term_width(), 111 initial_indent=doc_prefix, 112 subsequent_indent=doc_prefix) 113 attr_info += f"\n{doc_text}" 114 print(attr_info) 115 116 # Recursively show nested attributes 117 if nested_set_name in ynl.attr_sets: 118 nested_set = ynl.attr_sets[nested_set_name] 119 # Filter out 'unspec' and other unused attrs 120 nested_names = [n for n in nested_set.attrs.keys() 121 if nested_set.attrs[n].type != 'unused'] 122 if nested_names: 123 print_attr_list(ynl, nested_names, nested_set, indent + 4) 124 125 126def print_mode_attrs(ynl, mode, mode_spec, attr_set, consistent_dd_reply=None): 127 """Print a given mode (do/dump/event/notify).""" 128 mode_title = mode.capitalize() 129 130 if 'request' in mode_spec and 'attributes' in mode_spec['request']: 131 print(f'\n{mode_title} request attributes:') 132 print_attr_list(ynl, mode_spec['request']['attributes'], attr_set) 133 134 if 'reply' in mode_spec and 'attributes' in mode_spec['reply']: 135 if consistent_dd_reply and mode == "do": 136 title = None # Dump handling will print in combined format 137 elif consistent_dd_reply and mode == "dump": 138 title = 'Do and Dump' 139 else: 140 title = f'{mode_title}' 141 if title: 142 print(f'\n{title} reply attributes:') 143 print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set) 144 145 146def do_doc(ynl, op): 147 """Handle --list-attrs $op, print the attr information to stdout""" 148 print(f'Operation: {color(op.name, Colors.BOLD)}') 149 print(op.yaml['doc']) 150 151 consistent_dd_reply = False 152 if 'do' in op.yaml and 'dump' in op.yaml and 'reply' in op.yaml['do'] and \ 153 op.yaml['do']['reply'] == op.yaml['dump'].get('reply'): 154 consistent_dd_reply = True 155 156 for mode in ['do', 'dump']: 157 if mode in op.yaml: 158 print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set, 159 consistent_dd_reply=consistent_dd_reply) 160 161 if 'attributes' in op.yaml.get('event', {}): 162 print('\nEvent attributes:') 163 print_attr_list(ynl, op.yaml['event']['attributes'], op.attr_set) 164 165 if 'notify' in op.yaml: 166 mode_spec = op.yaml['notify'] 167 ref_spec = ynl.msgs.get(mode_spec).yaml.get('do') 168 if not ref_spec: 169 ref_spec = ynl.msgs.get(mode_spec).yaml.get('dump') 170 if ref_spec: 171 print('\nNotification attributes:') 172 print_attr_list(ynl, ref_spec['reply']['attributes'], op.attr_set) 173 174 if 'mcgrp' in op.yaml: 175 print(f"\nMulticast group: {op.yaml['mcgrp']}") 176 177 178# pylint: disable=too-many-locals,too-many-branches,too-many-statements 179def main(): 180 """YNL cli tool""" 181 182 description = """ 183 YNL CLI utility - a general purpose netlink utility that uses YAML 184 specs to drive protocol encoding and decoding. 185 """ 186 epilog = """ 187 The --multi option can be repeated to include several do operations 188 in the same netlink payload. 189 """ 190 191 parser = argparse.ArgumentParser(description=description, 192 epilog=epilog, add_help=False) 193 194 gen_group = parser.add_argument_group('General options') 195 gen_group.add_argument('-h', '--help', action='help', 196 help='show this help message and exit') 197 198 spec_group = parser.add_argument_group('Netlink family selection') 199 spec_sel = spec_group.add_mutually_exclusive_group(required=True) 200 spec_sel.add_argument('--list-families', action='store_true', 201 help=('list Netlink families supported by YNL ' 202 '(which have a spec available in the standard ' 203 'system path)')) 204 spec_sel.add_argument('--family', dest='family', type=str, 205 help='name of the Netlink FAMILY to use') 206 spec_sel.add_argument('--spec', dest='spec', type=str, 207 help='full file path to the YAML spec file') 208 209 ops_group = parser.add_argument_group('Operations') 210 ops = ops_group.add_mutually_exclusive_group() 211 ops.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str) 212 ops.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str) 213 ops.add_argument('--multi', dest='multi', nargs=2, action='append', 214 metavar=('DO-OPERATION', 'JSON_TEXT'), type=str, 215 help="Multi-message operation sequence (for nftables)") 216 ops.add_argument('--list-ops', action='store_true', 217 help="List available --do and --dump operations") 218 ops.add_argument('--list-msgs', action='store_true', 219 help="List all messages of the family (incl. notifications)") 220 ops.add_argument('--list-attrs', '--doc', dest='list_attrs', metavar='MSG', 221 type=str, help='List attributes for a message / operation') 222 ops.add_argument('--validate', action='store_true', 223 help="Validate the spec against schema and exit") 224 225 io_group = parser.add_argument_group('Input / Output') 226 io_group.add_argument('--json', dest='json_text', type=str, 227 help=('Specify attributes of the message to send ' 228 'to the kernel in JSON format. Can be left out ' 229 'if the message is expected to be empty.')) 230 io_group.add_argument('--output-json', action='store_true', 231 help='Format output as JSON') 232 233 ntf_group = parser.add_argument_group('Notifications') 234 ntf_group.add_argument('--subscribe', dest='ntf', type=str) 235 ntf_group.add_argument('--duration', dest='duration', type=int, 236 help='when subscribed, watch for DURATION seconds') 237 ntf_group.add_argument('--sleep', dest='duration', type=int, 238 help='alias for duration') 239 240 nlflags = parser.add_argument_group('Netlink message flags (NLM_F_*)', 241 ('Extra flags to set in nlmsg_flags of ' 242 'the request, used mostly by older ' 243 'Classic Netlink families.')) 244 nlflags.add_argument('--replace', dest='flags', action='append_const', 245 const=Netlink.NLM_F_REPLACE) 246 nlflags.add_argument('--excl', dest='flags', action='append_const', 247 const=Netlink.NLM_F_EXCL) 248 nlflags.add_argument('--create', dest='flags', action='append_const', 249 const=Netlink.NLM_F_CREATE) 250 nlflags.add_argument('--append', dest='flags', action='append_const', 251 const=Netlink.NLM_F_APPEND) 252 253 schema_group = parser.add_argument_group('Development options') 254 schema_group.add_argument('--schema', dest='schema', type=str, 255 help="JSON schema to validate the spec") 256 schema_group.add_argument('--no-schema', action='store_true') 257 258 dbg_group = parser.add_argument_group('Debug options') 259 dbg_group.add_argument('--dbg-small-recv', default=0, const=4000, 260 action='store', nargs='?', type=int, metavar='INT', 261 help="Length of buffers used for recv()") 262 dbg_group.add_argument('--process-unknown', action=argparse.BooleanOptionalAction) 263 264 args = parser.parse_args() 265 266 def output(msg): 267 if args.output_json: 268 print(json.dumps(msg, cls=YnlEncoder)) 269 else: 270 pprint.pprint(msg, width=term_width(), compact=True) 271 272 if args.list_families: 273 for filename in sorted(os.listdir(spec_dir())): 274 if filename.endswith('.yaml'): 275 print(filename.removesuffix('.yaml')) 276 return 277 278 if args.no_schema: 279 args.schema = '' 280 281 attrs = {} 282 if args.json_text: 283 attrs = json.loads(args.json_text) 284 285 if args.family: 286 spec = f"{spec_dir()}/{args.family}.yaml" 287 else: 288 spec = args.spec 289 if not os.path.isfile(spec): 290 raise YnlException(f"Spec file {spec} does not exist") 291 292 if args.validate: 293 try: 294 SpecFamily(spec, args.schema) 295 except SpecException as error: 296 print(error) 297 sys.exit(1) 298 return 299 300 if args.family: # set behaviour when using installed specs 301 if args.schema is None and spec.startswith(SYS_SCHEMA_DIR): 302 args.schema = '' # disable schema validation when installed 303 if args.process_unknown is None: 304 args.process_unknown = True 305 306 ynl = YnlFamily(spec, args.schema, args.process_unknown, 307 recv_size=args.dbg_small_recv) 308 if args.dbg_small_recv: 309 ynl.set_recv_dbg(True) 310 311 if args.ntf: 312 ynl.ntf_subscribe(args.ntf) 313 314 if args.list_ops: 315 for op_name, op in ynl.ops.items(): 316 print(op_name, " [", ", ".join(op.modes), "]") 317 if args.list_msgs: 318 for op_name, op in ynl.msgs.items(): 319 print(op_name, " [", ", ".join(op.modes), "]") 320 321 if args.list_attrs: 322 op = ynl.msgs.get(args.list_attrs) 323 if not op: 324 print(f'Operation {args.list_attrs} not found') 325 sys.exit(1) 326 327 do_doc(ynl, op) 328 329 try: 330 if args.do: 331 reply = ynl.do(args.do, attrs, args.flags) 332 output(reply) 333 if args.dump: 334 reply = ynl.dump(args.dump, attrs) 335 output(reply) 336 if args.multi: 337 ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ] 338 reply = ynl.do_multi(ops) 339 output(reply) 340 341 if args.ntf: 342 for msg in ynl.poll_ntf(duration=args.duration): 343 output(msg) 344 except NlError as e: 345 print(e) 346 sys.exit(1) 347 except KeyboardInterrupt: 348 pass 349 except BrokenPipeError: 350 pass 351 352 353if __name__ == "__main__": 354 main() 355