1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 3 4""" 5YNL cli tool 6""" 7 8import argparse 9import json 10import os 11import pathlib 12import pprint 13import shutil 14import sys 15import textwrap 16 17# pylint: disable=no-name-in-module,wrong-import-position 18sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix()) 19from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException 20 21SYS_SCHEMA_DIR='/usr/share/ynl' 22RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink' 23 24# pylint: disable=too-few-public-methods,too-many-locals 25class Colors: 26 """ANSI color and font modifier codes""" 27 RESET = '\033[0m' 28 29 BOLD = '\033[1m' 30 ITALICS = '\033[3m' 31 UNDERLINE = '\033[4m' 32 INVERT = '\033[7m' 33 34 35def color(text, modifiers): 36 """Add color to text if output is a TTY 37 38 Returns: 39 Colored text if stdout is a TTY, otherwise plain text 40 """ 41 if sys.stdout.isatty(): 42 # Join the colors if they are a list, if it's a string this a noop 43 modifiers = "".join(modifiers) 44 return f"{modifiers}{text}{Colors.RESET}" 45 return text 46 47def schema_dir(): 48 """ 49 Return the effective schema directory, preferring in-tree before 50 system schema directory. 51 """ 52 script_dir = os.path.dirname(os.path.abspath(__file__)) 53 schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}") 54 if not os.path.isdir(schema_dir_): 55 schema_dir_ = SYS_SCHEMA_DIR 56 if not os.path.isdir(schema_dir_): 57 raise YnlException(f"Schema directory {schema_dir_} does not exist") 58 return schema_dir_ 59 60def spec_dir(): 61 """ 62 Return the effective spec directory, relative to the effective 63 schema directory. 64 """ 65 spec_dir_ = schema_dir() + '/specs' 66 if not os.path.isdir(spec_dir_): 67 raise YnlException(f"Spec directory {spec_dir_} does not exist") 68 return spec_dir_ 69 70 71class YnlEncoder(json.JSONEncoder): 72 """A custom encoder for emitting JSON with ynl-specific instance types""" 73 def default(self, o): 74 if isinstance(o, bytes): 75 return bytes.hex(o) 76 if isinstance(o, set): 77 return list(o) 78 return json.JSONEncoder.default(self, o) 79 80 81def print_attr_list(ynl, attr_names, attr_set, indent=2): 82 """Print a list of attributes with their types and documentation.""" 83 prefix = ' ' * indent 84 for attr_name in attr_names: 85 if attr_name in attr_set.attrs: 86 attr = attr_set.attrs[attr_name] 87 attr_info = f'{prefix}- {color(attr_name, Colors.BOLD)}: {attr.type}' 88 if 'enum' in attr.yaml: 89 enum_name = attr.yaml['enum'] 90 attr_info += f" (enum: {enum_name})" 91 # Print enum values if available 92 if enum_name in ynl.consts: 93 const = ynl.consts[enum_name] 94 enum_values = list(const.entries.keys()) 95 type_fmted = color(const.type.capitalize(), Colors.ITALICS) 96 attr_info += f"\n{prefix} {type_fmted}: {', '.join(enum_values)}" 97 98 # Show nested attributes reference and recursively display them 99 nested_set_name = None 100 if attr.type == 'nest' and 'nested-attributes' in attr.yaml: 101 nested_set_name = attr.yaml['nested-attributes'] 102 attr_info += f" -> {nested_set_name}" 103 104 if attr.yaml.get('doc'): 105 doc_prefix = prefix + ' ' * 4 106 term_width = shutil.get_terminal_size().columns 107 doc_text = textwrap.fill(attr.yaml['doc'], width=term_width, 108 initial_indent=doc_prefix, 109 subsequent_indent=doc_prefix) 110 attr_info += f"\n{doc_text}" 111 print(attr_info) 112 113 # Recursively show nested attributes 114 if nested_set_name in ynl.attr_sets: 115 nested_set = ynl.attr_sets[nested_set_name] 116 # Filter out 'unspec' and other unused attrs 117 nested_names = [n for n in nested_set.attrs.keys() 118 if nested_set.attrs[n].type != 'unused'] 119 if nested_names: 120 print_attr_list(ynl, nested_names, nested_set, indent + 4) 121 122 123def print_mode_attrs(ynl, mode, mode_spec, attr_set, consistent_dd_reply=None): 124 """Print a given mode (do/dump/event/notify).""" 125 mode_title = mode.capitalize() 126 127 if 'request' in mode_spec and 'attributes' in mode_spec['request']: 128 print(f'\n{mode_title} request attributes:') 129 print_attr_list(ynl, mode_spec['request']['attributes'], attr_set) 130 131 if 'reply' in mode_spec and 'attributes' in mode_spec['reply']: 132 if consistent_dd_reply and mode == "do": 133 title = None # Dump handling will print in combined format 134 elif consistent_dd_reply and mode == "dump": 135 title = 'Do and Dump' 136 else: 137 title = f'{mode_title}' 138 if title: 139 print(f'\n{title} reply attributes:') 140 print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set) 141 142 143def do_doc(ynl, op): 144 """Handle --list-attrs $op, print the attr information to stdout""" 145 print(f'Operation: {color(op.name, Colors.BOLD)}') 146 print(op.yaml['doc']) 147 148 consistent_dd_reply = False 149 if 'do' in op.yaml and 'dump' in op.yaml and 'reply' in op.yaml['do'] and \ 150 op.yaml['do']['reply'] == op.yaml['dump'].get('reply'): 151 consistent_dd_reply = True 152 153 for mode in ['do', 'dump']: 154 if mode in op.yaml: 155 print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set, 156 consistent_dd_reply=consistent_dd_reply) 157 158 if 'attributes' in op.yaml.get('event', {}): 159 print('\nEvent attributes:') 160 print_attr_list(ynl, op.yaml['event']['attributes'], op.attr_set) 161 162 if 'notify' in op.yaml: 163 mode_spec = op.yaml['notify'] 164 ref_spec = ynl.msgs.get(mode_spec).yaml.get('do') 165 if not ref_spec: 166 ref_spec = ynl.msgs.get(mode_spec).yaml.get('dump') 167 if ref_spec: 168 print('\nNotification attributes:') 169 print_attr_list(ynl, ref_spec['reply']['attributes'], op.attr_set) 170 171 if 'mcgrp' in op.yaml: 172 print(f"\nMulticast group: {op.yaml['mcgrp']}") 173 174 175# pylint: disable=too-many-locals,too-many-branches,too-many-statements 176def main(): 177 """YNL cli tool""" 178 179 description = """ 180 YNL CLI utility - a general purpose netlink utility that uses YAML 181 specs to drive protocol encoding and decoding. 182 """ 183 epilog = """ 184 The --multi option can be repeated to include several do operations 185 in the same netlink payload. 186 """ 187 188 parser = argparse.ArgumentParser(description=description, 189 epilog=epilog, add_help=False) 190 191 gen_group = parser.add_argument_group('General options') 192 gen_group.add_argument('-h', '--help', action='help', 193 help='show this help message and exit') 194 195 spec_group = parser.add_argument_group('Netlink family selection') 196 spec_sel = spec_group.add_mutually_exclusive_group(required=True) 197 spec_sel.add_argument('--list-families', action='store_true', 198 help=('list Netlink families supported by YNL ' 199 '(which have a spec available in the standard ' 200 'system path)')) 201 spec_sel.add_argument('--family', dest='family', type=str, 202 help='name of the Netlink FAMILY to use') 203 spec_sel.add_argument('--spec', dest='spec', type=str, 204 help='full file path to the YAML spec file') 205 206 ops_group = parser.add_argument_group('Operations') 207 ops = ops_group.add_mutually_exclusive_group() 208 ops.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str) 209 ops.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str) 210 ops.add_argument('--multi', dest='multi', nargs=2, action='append', 211 metavar=('DO-OPERATION', 'JSON_TEXT'), type=str, 212 help="Multi-message operation sequence (for nftables)") 213 ops.add_argument('--list-ops', action='store_true', 214 help="List available --do and --dump operations") 215 ops.add_argument('--list-msgs', action='store_true', 216 help="List all messages of the family (incl. notifications)") 217 ops.add_argument('--list-attrs', '--doc', dest='list_attrs', metavar='MSG', 218 type=str, help='List attributes for a message / operation') 219 ops.add_argument('--validate', action='store_true', 220 help="Validate the spec against schema and exit") 221 222 io_group = parser.add_argument_group('Input / Output') 223 io_group.add_argument('--json', dest='json_text', type=str, 224 help=('Specify attributes of the message to send ' 225 'to the kernel in JSON format. Can be left out ' 226 'if the message is expected to be empty.')) 227 io_group.add_argument('--output-json', action='store_true', 228 help='Format output as JSON') 229 230 ntf_group = parser.add_argument_group('Notifications') 231 ntf_group.add_argument('--subscribe', dest='ntf', type=str) 232 ntf_group.add_argument('--duration', dest='duration', type=int, 233 help='when subscribed, watch for DURATION seconds') 234 ntf_group.add_argument('--sleep', dest='duration', type=int, 235 help='alias for duration') 236 237 nlflags = parser.add_argument_group('Netlink message flags (NLM_F_*)', 238 ('Extra flags to set in nlmsg_flags of ' 239 'the request, used mostly by older ' 240 'Classic Netlink families.')) 241 nlflags.add_argument('--replace', dest='flags', action='append_const', 242 const=Netlink.NLM_F_REPLACE) 243 nlflags.add_argument('--excl', dest='flags', action='append_const', 244 const=Netlink.NLM_F_EXCL) 245 nlflags.add_argument('--create', dest='flags', action='append_const', 246 const=Netlink.NLM_F_CREATE) 247 nlflags.add_argument('--append', dest='flags', action='append_const', 248 const=Netlink.NLM_F_APPEND) 249 250 schema_group = parser.add_argument_group('Development options') 251 schema_group.add_argument('--schema', dest='schema', type=str, 252 help="JSON schema to validate the spec") 253 schema_group.add_argument('--no-schema', action='store_true') 254 255 dbg_group = parser.add_argument_group('Debug options') 256 dbg_group.add_argument('--dbg-small-recv', default=0, const=4000, 257 action='store', nargs='?', type=int, metavar='INT', 258 help="Length of buffers used for recv()") 259 dbg_group.add_argument('--process-unknown', action=argparse.BooleanOptionalAction) 260 261 args = parser.parse_args() 262 263 def output(msg): 264 if args.output_json: 265 print(json.dumps(msg, cls=YnlEncoder)) 266 else: 267 pprint.PrettyPrinter().pprint(msg) 268 269 if args.list_families: 270 for filename in sorted(os.listdir(spec_dir())): 271 if filename.endswith('.yaml'): 272 print(filename.removesuffix('.yaml')) 273 return 274 275 if args.no_schema: 276 args.schema = '' 277 278 attrs = {} 279 if args.json_text: 280 attrs = json.loads(args.json_text) 281 282 if args.family: 283 spec = f"{spec_dir()}/{args.family}.yaml" 284 else: 285 spec = args.spec 286 if not os.path.isfile(spec): 287 raise YnlException(f"Spec file {spec} does not exist") 288 289 if args.validate: 290 try: 291 SpecFamily(spec, args.schema) 292 except SpecException as error: 293 print(error) 294 sys.exit(1) 295 return 296 297 if args.family: # set behaviour when using installed specs 298 if args.schema is None and spec.startswith(SYS_SCHEMA_DIR): 299 args.schema = '' # disable schema validation when installed 300 if args.process_unknown is None: 301 args.process_unknown = True 302 303 ynl = YnlFamily(spec, args.schema, args.process_unknown, 304 recv_size=args.dbg_small_recv) 305 if args.dbg_small_recv: 306 ynl.set_recv_dbg(True) 307 308 if args.ntf: 309 ynl.ntf_subscribe(args.ntf) 310 311 if args.list_ops: 312 for op_name, op in ynl.ops.items(): 313 print(op_name, " [", ", ".join(op.modes), "]") 314 if args.list_msgs: 315 for op_name, op in ynl.msgs.items(): 316 print(op_name, " [", ", ".join(op.modes), "]") 317 318 if args.list_attrs: 319 op = ynl.msgs.get(args.list_attrs) 320 if not op: 321 print(f'Operation {args.list_attrs} not found') 322 sys.exit(1) 323 324 do_doc(ynl, op) 325 326 try: 327 if args.do: 328 reply = ynl.do(args.do, attrs, args.flags) 329 output(reply) 330 if args.dump: 331 reply = ynl.dump(args.dump, attrs) 332 output(reply) 333 if args.multi: 334 ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ] 335 reply = ynl.do_multi(ops) 336 output(reply) 337 338 if args.ntf: 339 for msg in ynl.poll_ntf(duration=args.duration): 340 output(msg) 341 except NlError as e: 342 print(e) 343 sys.exit(1) 344 except KeyboardInterrupt: 345 pass 346 except BrokenPipeError: 347 pass 348 349 350if __name__ == "__main__": 351 main() 352