1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 3 4""" 5YNL cli tool 6""" 7 8import argparse 9import json 10import os 11import pathlib 12import pprint 13import shutil 14import sys 15import textwrap 16 17# pylint: disable=no-name-in-module,wrong-import-position 18sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix()) 19from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException 20 21SYS_SCHEMA_DIR='/usr/share/ynl' 22RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink' 23 24# pylint: disable=too-few-public-methods,too-many-locals 25class Colors: 26 """ANSI color and font modifier codes""" 27 RESET = '\033[0m' 28 29 BOLD = '\033[1m' 30 ITALICS = '\033[3m' 31 UNDERLINE = '\033[4m' 32 INVERT = '\033[7m' 33 34 35def color(text, modifiers): 36 """Add color to text if output is a TTY 37 38 Returns: 39 Colored text if stdout is a TTY, otherwise plain text 40 """ 41 if sys.stdout.isatty(): 42 # Join the colors if they are a list, if it's a string this a noop 43 modifiers = "".join(modifiers) 44 return f"{modifiers}{text}{Colors.RESET}" 45 return text 46 47def schema_dir(): 48 """ 49 Return the effective schema directory, preferring in-tree before 50 system schema directory. 51 """ 52 script_dir = os.path.dirname(os.path.abspath(__file__)) 53 schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}") 54 if not os.path.isdir(schema_dir_): 55 schema_dir_ = SYS_SCHEMA_DIR 56 if not os.path.isdir(schema_dir_): 57 raise YnlException(f"Schema directory {schema_dir_} does not exist") 58 return schema_dir_ 59 60def spec_dir(): 61 """ 62 Return the effective spec directory, relative to the effective 63 schema directory. 64 """ 65 spec_dir_ = schema_dir() + '/specs' 66 if not os.path.isdir(spec_dir_): 67 raise YnlException(f"Spec directory {spec_dir_} does not exist") 68 return spec_dir_ 69 70 71class YnlEncoder(json.JSONEncoder): 72 """A custom encoder for emitting JSON with ynl-specific instance types""" 73 def default(self, o): 74 if isinstance(o, bytes): 75 return bytes.hex(o) 76 if isinstance(o, set): 77 return list(o) 78 return json.JSONEncoder.default(self, o) 79 80 81def print_attr_list(ynl, attr_names, attr_set, indent=2): 82 """Print a list of attributes with their types and documentation.""" 83 prefix = ' ' * indent 84 for attr_name in attr_names: 85 if attr_name in attr_set.attrs: 86 attr = attr_set.attrs[attr_name] 87 attr_info = f'{prefix}- {color(attr_name, Colors.BOLD)}: {attr.type}' 88 if 'enum' in attr.yaml: 89 enum_name = attr.yaml['enum'] 90 attr_info += f" (enum: {enum_name})" 91 # Print enum values if available 92 if enum_name in ynl.consts: 93 const = ynl.consts[enum_name] 94 enum_values = list(const.entries.keys()) 95 type_fmted = color(const.type.capitalize(), Colors.ITALICS) 96 attr_info += f"\n{prefix} {type_fmted}: {', '.join(enum_values)}" 97 98 # Show nested attributes reference and recursively display them 99 nested_set_name = None 100 if attr.type == 'nest' and 'nested-attributes' in attr.yaml: 101 nested_set_name = attr.yaml['nested-attributes'] 102 attr_info += f" -> {nested_set_name}" 103 104 if attr.yaml.get('doc'): 105 doc_prefix = prefix + ' ' * 4 106 term_width = shutil.get_terminal_size().columns 107 doc_text = textwrap.fill(attr.yaml['doc'], width=term_width, 108 initial_indent=doc_prefix, 109 subsequent_indent=doc_prefix) 110 attr_info += f"\n{doc_text}" 111 print(attr_info) 112 113 # Recursively show nested attributes 114 if nested_set_name in ynl.attr_sets: 115 nested_set = ynl.attr_sets[nested_set_name] 116 # Filter out 'unspec' and other unused attrs 117 nested_names = [n for n in nested_set.attrs.keys() 118 if nested_set.attrs[n].type != 'unused'] 119 if nested_names: 120 print_attr_list(ynl, nested_names, nested_set, indent + 4) 121 122 123def print_mode_attrs(ynl, mode, mode_spec, attr_set): 124 """Print a given mode (do/dump/event/notify).""" 125 mode_title = mode.capitalize() 126 127 if 'request' in mode_spec and 'attributes' in mode_spec['request']: 128 print(f'\n{mode_title} request attributes:') 129 print_attr_list(ynl, mode_spec['request']['attributes'], attr_set) 130 131 if 'reply' in mode_spec and 'attributes' in mode_spec['reply']: 132 print(f'\n{mode_title} reply attributes:') 133 print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set) 134 135 136def do_doc(ynl, op): 137 """Handle --list-attrs $op, print the attr information to stdout""" 138 print(f'Operation: {color(op.name, Colors.BOLD)}') 139 print(op.yaml['doc']) 140 141 for mode in ['do', 'dump']: 142 if mode in op.yaml: 143 print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set) 144 145 if 'attributes' in op.yaml.get('event', {}): 146 print('\nEvent attributes:') 147 print_attr_list(ynl, op.yaml['event']['attributes'], op.attr_set) 148 149 if 'notify' in op.yaml: 150 mode_spec = op.yaml['notify'] 151 ref_spec = ynl.msgs.get(mode_spec).yaml.get('do') 152 if not ref_spec: 153 ref_spec = ynl.msgs.get(mode_spec).yaml.get('dump') 154 if ref_spec: 155 print('\nNotification attributes:') 156 print_attr_list(ynl, ref_spec['reply']['attributes'], op.attr_set) 157 158 if 'mcgrp' in op.yaml: 159 print(f"\nMulticast group: {op.yaml['mcgrp']}") 160 161 162# pylint: disable=too-many-locals,too-many-branches,too-many-statements 163def main(): 164 """YNL cli tool""" 165 166 description = """ 167 YNL CLI utility - a general purpose netlink utility that uses YAML 168 specs to drive protocol encoding and decoding. 169 """ 170 epilog = """ 171 The --multi option can be repeated to include several do operations 172 in the same netlink payload. 173 """ 174 175 parser = argparse.ArgumentParser(description=description, 176 epilog=epilog, add_help=False) 177 178 gen_group = parser.add_argument_group('General options') 179 gen_group.add_argument('-h', '--help', action='help', 180 help='show this help message and exit') 181 182 spec_group = parser.add_argument_group('Netlink family selection') 183 spec_sel = spec_group.add_mutually_exclusive_group(required=True) 184 spec_sel.add_argument('--list-families', action='store_true', 185 help=('list Netlink families supported by YNL ' 186 '(which have a spec available in the standard ' 187 'system path)')) 188 spec_sel.add_argument('--family', dest='family', type=str, 189 help='name of the Netlink FAMILY to use') 190 spec_sel.add_argument('--spec', dest='spec', type=str, 191 help='full file path to the YAML spec file') 192 193 ops_group = parser.add_argument_group('Operations') 194 ops = ops_group.add_mutually_exclusive_group() 195 ops.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str) 196 ops.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str) 197 ops.add_argument('--multi', dest='multi', nargs=2, action='append', 198 metavar=('DO-OPERATION', 'JSON_TEXT'), type=str, 199 help="Multi-message operation sequence (for nftables)") 200 ops.add_argument('--list-ops', action='store_true', 201 help="List available --do and --dump operations") 202 ops.add_argument('--list-msgs', action='store_true', 203 help="List all messages of the family (incl. notifications)") 204 ops.add_argument('--list-attrs', '--doc', dest='list_attrs', metavar='MSG', 205 type=str, help='List attributes for a message / operation') 206 ops.add_argument('--validate', action='store_true', 207 help="Validate the spec against schema and exit") 208 209 io_group = parser.add_argument_group('Input / Output') 210 io_group.add_argument('--json', dest='json_text', type=str, 211 help=('Specify attributes of the message to send ' 212 'to the kernel in JSON format. Can be left out ' 213 'if the message is expected to be empty.')) 214 io_group.add_argument('--output-json', action='store_true', 215 help='Format output as JSON') 216 217 ntf_group = parser.add_argument_group('Notifications') 218 ntf_group.add_argument('--subscribe', dest='ntf', type=str) 219 ntf_group.add_argument('--duration', dest='duration', type=int, 220 help='when subscribed, watch for DURATION seconds') 221 ntf_group.add_argument('--sleep', dest='duration', type=int, 222 help='alias for duration') 223 224 nlflags = parser.add_argument_group('Netlink message flags (NLM_F_*)', 225 ('Extra flags to set in nlmsg_flags of ' 226 'the request, used mostly by older ' 227 'Classic Netlink families.')) 228 nlflags.add_argument('--replace', dest='flags', action='append_const', 229 const=Netlink.NLM_F_REPLACE) 230 nlflags.add_argument('--excl', dest='flags', action='append_const', 231 const=Netlink.NLM_F_EXCL) 232 nlflags.add_argument('--create', dest='flags', action='append_const', 233 const=Netlink.NLM_F_CREATE) 234 nlflags.add_argument('--append', dest='flags', action='append_const', 235 const=Netlink.NLM_F_APPEND) 236 237 schema_group = parser.add_argument_group('Development options') 238 schema_group.add_argument('--schema', dest='schema', type=str, 239 help="JSON schema to validate the spec") 240 schema_group.add_argument('--no-schema', action='store_true') 241 242 dbg_group = parser.add_argument_group('Debug options') 243 dbg_group.add_argument('--dbg-small-recv', default=0, const=4000, 244 action='store', nargs='?', type=int, metavar='INT', 245 help="Length of buffers used for recv()") 246 dbg_group.add_argument('--process-unknown', action=argparse.BooleanOptionalAction) 247 248 args = parser.parse_args() 249 250 def output(msg): 251 if args.output_json: 252 print(json.dumps(msg, cls=YnlEncoder)) 253 else: 254 pprint.PrettyPrinter().pprint(msg) 255 256 if args.list_families: 257 for filename in sorted(os.listdir(spec_dir())): 258 if filename.endswith('.yaml'): 259 print(filename.removesuffix('.yaml')) 260 return 261 262 if args.no_schema: 263 args.schema = '' 264 265 attrs = {} 266 if args.json_text: 267 attrs = json.loads(args.json_text) 268 269 if args.family: 270 spec = f"{spec_dir()}/{args.family}.yaml" 271 else: 272 spec = args.spec 273 if not os.path.isfile(spec): 274 raise YnlException(f"Spec file {spec} does not exist") 275 276 if args.validate: 277 try: 278 SpecFamily(spec, args.schema) 279 except SpecException as error: 280 print(error) 281 sys.exit(1) 282 return 283 284 if args.family: # set behaviour when using installed specs 285 if args.schema is None and spec.startswith(SYS_SCHEMA_DIR): 286 args.schema = '' # disable schema validation when installed 287 if args.process_unknown is None: 288 args.process_unknown = True 289 290 ynl = YnlFamily(spec, args.schema, args.process_unknown, 291 recv_size=args.dbg_small_recv) 292 if args.dbg_small_recv: 293 ynl.set_recv_dbg(True) 294 295 if args.ntf: 296 ynl.ntf_subscribe(args.ntf) 297 298 if args.list_ops: 299 for op_name, op in ynl.ops.items(): 300 print(op_name, " [", ", ".join(op.modes), "]") 301 if args.list_msgs: 302 for op_name, op in ynl.msgs.items(): 303 print(op_name, " [", ", ".join(op.modes), "]") 304 305 if args.list_attrs: 306 op = ynl.msgs.get(args.list_attrs) 307 if not op: 308 print(f'Operation {args.list_attrs} not found') 309 sys.exit(1) 310 311 do_doc(ynl, op) 312 313 try: 314 if args.do: 315 reply = ynl.do(args.do, attrs, args.flags) 316 output(reply) 317 if args.dump: 318 reply = ynl.dump(args.dump, attrs) 319 output(reply) 320 if args.multi: 321 ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ] 322 reply = ynl.do_multi(ops) 323 output(reply) 324 325 if args.ntf: 326 for msg in ynl.poll_ntf(duration=args.duration): 327 output(msg) 328 except NlError as e: 329 print(e) 330 sys.exit(1) 331 except KeyboardInterrupt: 332 pass 333 except BrokenPipeError: 334 pass 335 336 337if __name__ == "__main__": 338 main() 339