1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 3 4""" 5YNL cli tool 6""" 7 8import argparse 9import json 10import os 11import pathlib 12import pprint 13import shutil 14import sys 15import textwrap 16 17# pylint: disable=no-name-in-module,wrong-import-position 18sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix()) 19from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException 20 21SYS_SCHEMA_DIR='/usr/share/ynl' 22RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink' 23 24# pylint: disable=too-few-public-methods,too-many-locals 25class Colors: 26 """ANSI color and font modifier codes""" 27 RESET = '\033[0m' 28 29 BOLD = '\033[1m' 30 ITALICS = '\033[3m' 31 UNDERLINE = '\033[4m' 32 INVERT = '\033[7m' 33 34 35def color(text, modifiers): 36 """Add color to text if output is a TTY 37 38 Returns: 39 Colored text if stdout is a TTY, otherwise plain text 40 """ 41 if sys.stdout.isatty(): 42 # Join the colors if they are a list, if it's a string this a noop 43 modifiers = "".join(modifiers) 44 return f"{modifiers}{text}{Colors.RESET}" 45 return text 46 47def term_width(): 48 """ Get terminal width in columns (80 if stdout is not a terminal) """ 49 return shutil.get_terminal_size().columns 50 51def schema_dir(): 52 """ 53 Return the effective schema directory, preferring in-tree before 54 system schema directory. 55 """ 56 script_dir = os.path.dirname(os.path.abspath(__file__)) 57 schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}") 58 if not os.path.isdir(schema_dir_): 59 schema_dir_ = SYS_SCHEMA_DIR 60 if not os.path.isdir(schema_dir_): 61 raise YnlException(f"Schema directory {schema_dir_} does not exist") 62 return schema_dir_ 63 64def spec_dir(): 65 """ 66 Return the effective spec directory, relative to the effective 67 schema directory. 68 """ 69 spec_dir_ = schema_dir() + '/specs' 70 if not os.path.isdir(spec_dir_): 71 raise YnlException(f"Spec directory {spec_dir_} does not exist") 72 return spec_dir_ 73 74 75class YnlEncoder(json.JSONEncoder): 76 """A custom encoder for emitting JSON with ynl-specific instance types""" 77 def default(self, o): 78 if isinstance(o, bytes): 79 return bytes.hex(o) 80 if isinstance(o, set): 81 return sorted(o) 82 return json.JSONEncoder.default(self, o) 83 84 85def print_attr_list(ynl, attr_names, attr_set, indent=2): 86 """Print a list of attributes with their types and documentation.""" 87 prefix = ' ' * indent 88 for attr_name in attr_names: 89 if attr_name in attr_set.attrs: 90 attr = attr_set.attrs[attr_name] 91 attr_info = f'{prefix}- {color(attr_name, Colors.BOLD)}: {attr.type}' 92 if 'enum' in attr.yaml: 93 enum_name = attr.yaml['enum'] 94 attr_info += f" (enum: {enum_name})" 95 # Print enum values if available 96 if enum_name in ynl.consts: 97 const = ynl.consts[enum_name] 98 enum_values = list(const.entries.keys()) 99 type_fmted = color(const.type.capitalize(), Colors.ITALICS) 100 attr_info += f"\n{prefix} {type_fmted}: {', '.join(enum_values)}" 101 102 # Show nested attributes reference and recursively display them 103 nested_set_name = None 104 if attr.type == 'nest' and 'nested-attributes' in attr.yaml: 105 nested_set_name = attr.yaml['nested-attributes'] 106 attr_info += f" -> {nested_set_name}" 107 108 if attr.yaml.get('doc'): 109 doc_prefix = prefix + ' ' * 4 110 doc_text = textwrap.fill(attr.yaml['doc'], width=term_width(), 111 initial_indent=doc_prefix, 112 subsequent_indent=doc_prefix) 113 attr_info += f"\n{doc_text}" 114 print(attr_info) 115 116 # Recursively show nested attributes 117 if nested_set_name in ynl.attr_sets: 118 nested_set = ynl.attr_sets[nested_set_name] 119 # Filter out 'unspec' and other unused attrs 120 nested_names = [n for n in nested_set.attrs.keys() 121 if nested_set.attrs[n].type != 'unused'] 122 if nested_names: 123 print_attr_list(ynl, nested_names, nested_set, indent + 4) 124 125 126def print_mode_attrs(ynl, mode, mode_spec, attr_set, consistent_dd_reply=None): 127 """Print a given mode (do/dump/event/notify).""" 128 mode_title = mode.capitalize() 129 130 if 'request' in mode_spec and 'attributes' in mode_spec['request']: 131 print(f'\n{mode_title} request attributes:') 132 print_attr_list(ynl, mode_spec['request']['attributes'], attr_set) 133 134 if 'reply' in mode_spec and 'attributes' in mode_spec['reply']: 135 if consistent_dd_reply and mode == "do": 136 title = None # Dump handling will print in combined format 137 elif consistent_dd_reply and mode == "dump": 138 title = 'Do and Dump' 139 else: 140 title = f'{mode_title}' 141 if title: 142 print(f'\n{title} reply attributes:') 143 print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set) 144 145 146def do_doc(ynl, op): 147 """Handle --list-attrs $op, print the attr information to stdout""" 148 print(f'Operation: {color(op.name, Colors.BOLD)}') 149 print(op.yaml['doc']) 150 151 consistent_dd_reply = False 152 if 'do' in op.yaml and 'dump' in op.yaml and 'reply' in op.yaml['do'] and \ 153 op.yaml['do']['reply'] == op.yaml['dump'].get('reply'): 154 consistent_dd_reply = True 155 156 for mode in ['do', 'dump']: 157 if mode in op.yaml: 158 print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set, 159 consistent_dd_reply=consistent_dd_reply) 160 161 if 'attributes' in op.yaml.get('event', {}): 162 print('\nEvent attributes:') 163 print_attr_list(ynl, op.yaml['event']['attributes'], op.attr_set) 164 165 if 'notify' in op.yaml: 166 mode_spec = op.yaml['notify'] 167 ref_spec = ynl.msgs.get(mode_spec).yaml.get('do') 168 if not ref_spec: 169 ref_spec = ynl.msgs.get(mode_spec).yaml.get('dump') 170 if ref_spec: 171 print('\nNotification attributes:') 172 print_attr_list(ynl, ref_spec['reply']['attributes'], op.attr_set) 173 174 if 'mcgrp' in op.yaml: 175 print(f"\nMulticast group: {op.yaml['mcgrp']}") 176 177 178# pylint: disable=too-many-locals,too-many-branches,too-many-statements 179def main(): 180 """YNL cli tool""" 181 182 description = """ 183 YNL CLI utility - a general purpose netlink utility that uses YAML 184 specs to drive protocol encoding and decoding. 185 """ 186 epilog = """ 187 The --multi option can be repeated to include several do operations 188 in the same netlink payload. 189 """ 190 191 parser = argparse.ArgumentParser(description=description, 192 epilog=epilog, add_help=False) 193 194 gen_group = parser.add_argument_group('General options') 195 gen_group.add_argument('-h', '--help', action='help', 196 help='show this help message and exit') 197 198 spec_group = parser.add_argument_group('Netlink family selection') 199 spec_sel = spec_group.add_mutually_exclusive_group(required=True) 200 spec_sel.add_argument('--list-families', action='store_true', 201 help=('list Netlink families supported by YNL ' 202 '(which have a spec available in the standard ' 203 'system path)')) 204 spec_sel.add_argument('--family', dest='family', type=str, 205 help='name of the Netlink FAMILY to use') 206 spec_sel.add_argument('--spec', dest='spec', type=str, 207 help='full file path to the YAML spec file') 208 209 ops_group = parser.add_argument_group('Operations') 210 ops = ops_group.add_mutually_exclusive_group() 211 ops.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str) 212 ops.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str) 213 ops.add_argument('--multi', dest='multi', nargs=2, action='append', 214 metavar=('DO-OPERATION', 'JSON_TEXT'), type=str, 215 help="Multi-message operation sequence (for nftables)") 216 ops.add_argument('--list-ops', action='store_true', 217 help="List available --do and --dump operations") 218 ops.add_argument('--list-msgs', action='store_true', 219 help="List all messages of the family (incl. notifications)") 220 ops.add_argument('--list-attrs', '--doc', dest='list_attrs', metavar='MSG', 221 type=str, help='List attributes for a message / operation') 222 ops.add_argument('--validate', action='store_true', 223 help="Validate the spec against schema and exit") 224 225 io_group = parser.add_argument_group('Input / Output') 226 io_group.add_argument('--json', dest='json_text', type=str, 227 help=('Specify attributes of the message to send ' 228 'to the kernel in JSON format. Can be left out ' 229 'if the message is expected to be empty.')) 230 io_group.add_argument('--output-json', action='store_true', 231 help='Format output as JSON') 232 233 ntf_group = parser.add_argument_group('Notifications') 234 ntf_group.add_argument('--subscribe', dest='ntf', type=str) 235 ntf_group.add_argument('--duration', dest='duration', type=int, 236 help='when subscribed, watch for DURATION seconds') 237 ntf_group.add_argument('--sleep', dest='duration', type=int, 238 help='alias for duration') 239 240 nlflags = parser.add_argument_group('Netlink message flags (NLM_F_*)', 241 ('Extra flags to set in nlmsg_flags of ' 242 'the request, used mostly by older ' 243 'Classic Netlink families.')) 244 nlflags.add_argument('--replace', dest='flags', action='append_const', 245 const=Netlink.NLM_F_REPLACE) 246 nlflags.add_argument('--excl', dest='flags', action='append_const', 247 const=Netlink.NLM_F_EXCL) 248 nlflags.add_argument('--create', dest='flags', action='append_const', 249 const=Netlink.NLM_F_CREATE) 250 nlflags.add_argument('--append', dest='flags', action='append_const', 251 const=Netlink.NLM_F_APPEND) 252 253 schema_group = parser.add_argument_group('Development options') 254 schema_group.add_argument('--schema', dest='schema', type=str, 255 help="JSON schema to validate the spec") 256 schema_group.add_argument('--no-schema', action='store_true') 257 258 dbg_group = parser.add_argument_group('Debug options') 259 io_group.add_argument('--policy', action='store_true', 260 help='Query kernel policy for the operation instead of executing it') 261 dbg_group.add_argument('--dbg-small-recv', default=0, const=4000, 262 action='store', nargs='?', type=int, metavar='INT', 263 help="Length of buffers used for recv()") 264 dbg_group.add_argument('--process-unknown', action=argparse.BooleanOptionalAction) 265 266 args = parser.parse_args() 267 268 def output(msg): 269 if args.output_json: 270 print(json.dumps(msg, cls=YnlEncoder)) 271 else: 272 pprint.pprint(msg, width=term_width(), compact=True) 273 274 if args.list_families: 275 for filename in sorted(os.listdir(spec_dir())): 276 if filename.endswith('.yaml'): 277 print(filename.removesuffix('.yaml')) 278 return 279 280 if args.no_schema: 281 args.schema = '' 282 283 attrs = {} 284 if args.json_text: 285 attrs = json.loads(args.json_text) 286 287 if args.family: 288 spec = f"{spec_dir()}/{args.family}.yaml" 289 else: 290 spec = args.spec 291 if not os.path.isfile(spec): 292 raise YnlException(f"Spec file {spec} does not exist") 293 294 if args.validate: 295 try: 296 SpecFamily(spec, args.schema) 297 except SpecException as error: 298 print(error) 299 sys.exit(1) 300 return 301 302 if args.family: # set behaviour when using installed specs 303 if args.schema is None and spec.startswith(SYS_SCHEMA_DIR): 304 args.schema = '' # disable schema validation when installed 305 if args.process_unknown is None: 306 args.process_unknown = True 307 308 ynl = YnlFamily(spec, args.schema, args.process_unknown, 309 recv_size=args.dbg_small_recv) 310 if args.dbg_small_recv: 311 ynl.set_recv_dbg(True) 312 313 if args.policy: 314 if args.do: 315 pol = ynl.get_policy(args.do, 'do') 316 output(pol.to_dict() if pol else None) 317 args.do = None 318 if args.dump: 319 pol = ynl.get_policy(args.dump, 'dump') 320 output(pol.to_dict() if pol else None) 321 args.dump = None 322 323 if args.ntf: 324 ynl.ntf_subscribe(args.ntf) 325 326 if args.list_ops: 327 for op_name, op in ynl.ops.items(): 328 print(op_name, " [", ", ".join(op.modes), "]") 329 if args.list_msgs: 330 for op_name, op in ynl.msgs.items(): 331 print(op_name, " [", ", ".join(op.modes), "]") 332 333 if args.list_attrs: 334 op = ynl.msgs.get(args.list_attrs) 335 if not op: 336 print(f'Operation {args.list_attrs} not found') 337 sys.exit(1) 338 339 do_doc(ynl, op) 340 341 try: 342 if args.do: 343 reply = ynl.do(args.do, attrs, args.flags) 344 output(reply) 345 if args.dump: 346 reply = ynl.dump(args.dump, attrs) 347 output(reply) 348 if args.multi: 349 ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ] 350 reply = ynl.do_multi(ops) 351 output(reply) 352 353 if args.ntf: 354 for msg in ynl.poll_ntf(duration=args.duration): 355 output(msg) 356 except NlError as e: 357 print(e) 358 sys.exit(1) 359 except KeyboardInterrupt: 360 pass 361 except BrokenPipeError: 362 pass 363 364 365if __name__ == "__main__": 366 main() 367