1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 3 4""" 5YNL cli tool 6""" 7 8import argparse 9import json 10import os 11import pathlib 12import pprint 13import shutil 14import sys 15import textwrap 16 17# pylint: disable=no-name-in-module,wrong-import-position 18sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix()) 19from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException 20 21SYS_SCHEMA_DIR='/usr/share/ynl' 22RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink' 23 24# pylint: disable=too-few-public-methods,too-many-locals 25class Colors: 26 """ANSI color and font modifier codes""" 27 RESET = '\033[0m' 28 29 BOLD = '\033[1m' 30 ITALICS = '\033[3m' 31 UNDERLINE = '\033[4m' 32 INVERT = '\033[7m' 33 34 35def color(text, modifiers): 36 """Add color to text if output is a TTY 37 38 Returns: 39 Colored text if stdout is a TTY, otherwise plain text 40 """ 41 if sys.stdout.isatty(): 42 # Join the colors if they are a list, if it's a string this a noop 43 modifiers = "".join(modifiers) 44 return f"{modifiers}{text}{Colors.RESET}" 45 return text 46 47def schema_dir(): 48 """ 49 Return the effective schema directory, preferring in-tree before 50 system schema directory. 51 """ 52 script_dir = os.path.dirname(os.path.abspath(__file__)) 53 schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}") 54 if not os.path.isdir(schema_dir_): 55 schema_dir_ = SYS_SCHEMA_DIR 56 if not os.path.isdir(schema_dir_): 57 raise YnlException(f"Schema directory {schema_dir_} does not exist") 58 return schema_dir_ 59 60def spec_dir(): 61 """ 62 Return the effective spec directory, relative to the effective 63 schema directory. 64 """ 65 spec_dir_ = schema_dir() + '/specs' 66 if not os.path.isdir(spec_dir_): 67 raise YnlException(f"Spec directory {spec_dir_} does not exist") 68 return spec_dir_ 69 70 71class YnlEncoder(json.JSONEncoder): 72 """A custom encoder for emitting JSON with ynl-specific instance types""" 73 def default(self, o): 74 if isinstance(o, bytes): 75 return bytes.hex(o) 76 if isinstance(o, set): 77 return list(o) 78 return json.JSONEncoder.default(self, o) 79 80 81def print_attr_list(ynl, attr_names, attr_set, indent=2): 82 """Print a list of attributes with their types and documentation.""" 83 prefix = ' ' * indent 84 for attr_name in attr_names: 85 if attr_name in attr_set.attrs: 86 attr = attr_set.attrs[attr_name] 87 attr_info = f'{prefix}- {color(attr_name, Colors.BOLD)}: {attr.type}' 88 if 'enum' in attr.yaml: 89 enum_name = attr.yaml['enum'] 90 attr_info += f" (enum: {enum_name})" 91 # Print enum values if available 92 if enum_name in ynl.consts: 93 const = ynl.consts[enum_name] 94 enum_values = list(const.entries.keys()) 95 type_fmted = color(const.type.capitalize(), Colors.ITALICS) 96 attr_info += f"\n{prefix} {type_fmted}: {', '.join(enum_values)}" 97 98 # Show nested attributes reference and recursively display them 99 nested_set_name = None 100 if attr.type == 'nest' and 'nested-attributes' in attr.yaml: 101 nested_set_name = attr.yaml['nested-attributes'] 102 attr_info += f" -> {nested_set_name}" 103 104 if attr.yaml.get('doc'): 105 doc_prefix = prefix + ' ' * 4 106 term_width = shutil.get_terminal_size().columns 107 doc_text = textwrap.fill(attr.yaml['doc'], width=term_width, 108 initial_indent=doc_prefix, 109 subsequent_indent=doc_prefix) 110 attr_info += f"\n{doc_text}" 111 print(attr_info) 112 113 # Recursively show nested attributes 114 if nested_set_name in ynl.attr_sets: 115 nested_set = ynl.attr_sets[nested_set_name] 116 # Filter out 'unspec' and other unused attrs 117 nested_names = [n for n in nested_set.attrs.keys() 118 if nested_set.attrs[n].type != 'unused'] 119 if nested_names: 120 print_attr_list(ynl, nested_names, nested_set, indent + 4) 121 122 123def print_mode_attrs(ynl, mode, mode_spec, attr_set, print_request=True): 124 """Print a given mode (do/dump/event/notify).""" 125 mode_title = mode.capitalize() 126 127 if print_request and 'request' in mode_spec and 'attributes' in mode_spec['request']: 128 print(f'\n{mode_title} request attributes:') 129 print_attr_list(ynl, mode_spec['request']['attributes'], attr_set) 130 131 if 'reply' in mode_spec and 'attributes' in mode_spec['reply']: 132 print(f'\n{mode_title} reply attributes:') 133 print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set) 134 135 if 'attributes' in mode_spec: 136 print(f'\n{mode_title} attributes:') 137 print_attr_list(ynl, mode_spec['attributes'], attr_set) 138 139 140def do_doc(ynl, op): 141 """Handle --list-attrs $op, print the attr information to stdout""" 142 print(f'Operation: {color(op.name, Colors.BOLD)}') 143 print(op.yaml['doc']) 144 145 for mode in ['do', 'dump', 'event']: 146 if mode in op.yaml: 147 print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set, True) 148 149 if 'notify' in op.yaml: 150 mode_spec = op.yaml['notify'] 151 ref_spec = ynl.msgs.get(mode_spec).yaml.get('do') 152 if ref_spec: 153 print_mode_attrs(ynl, 'notify', ref_spec, op.attr_set, False) 154 155 if 'mcgrp' in op.yaml: 156 print(f"\nMulticast group: {op.yaml['mcgrp']}") 157 158 159# pylint: disable=too-many-locals,too-many-branches,too-many-statements 160def main(): 161 """YNL cli tool""" 162 163 description = """ 164 YNL CLI utility - a general purpose netlink utility that uses YAML 165 specs to drive protocol encoding and decoding. 166 """ 167 epilog = """ 168 The --multi option can be repeated to include several do operations 169 in the same netlink payload. 170 """ 171 172 parser = argparse.ArgumentParser(description=description, 173 epilog=epilog, add_help=False) 174 175 gen_group = parser.add_argument_group('General options') 176 gen_group.add_argument('-h', '--help', action='help', 177 help='show this help message and exit') 178 179 spec_group = parser.add_argument_group('Netlink family selection') 180 spec_sel = spec_group.add_mutually_exclusive_group(required=True) 181 spec_sel.add_argument('--list-families', action='store_true', 182 help=('list Netlink families supported by YNL ' 183 '(which have a spec available in the standard ' 184 'system path)')) 185 spec_sel.add_argument('--family', dest='family', type=str, 186 help='name of the Netlink FAMILY to use') 187 spec_sel.add_argument('--spec', dest='spec', type=str, 188 help='full file path to the YAML spec file') 189 190 ops_group = parser.add_argument_group('Operations') 191 ops = ops_group.add_mutually_exclusive_group() 192 ops.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str) 193 ops.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str) 194 ops.add_argument('--multi', dest='multi', nargs=2, action='append', 195 metavar=('DO-OPERATION', 'JSON_TEXT'), type=str, 196 help="Multi-message operation sequence (for nftables)") 197 ops.add_argument('--list-ops', action='store_true', 198 help="List available --do and --dump operations") 199 ops.add_argument('--list-msgs', action='store_true', 200 help="List all messages of the family (incl. notifications)") 201 ops.add_argument('--list-attrs', '--doc', dest='list_attrs', metavar='MSG', 202 type=str, help='List attributes for a message / operation') 203 ops.add_argument('--validate', action='store_true', 204 help="Validate the spec against schema and exit") 205 206 io_group = parser.add_argument_group('Input / Output') 207 io_group.add_argument('--json', dest='json_text', type=str, 208 help=('Specify attributes of the message to send ' 209 'to the kernel in JSON format. Can be left out ' 210 'if the message is expected to be empty.')) 211 io_group.add_argument('--output-json', action='store_true', 212 help='Format output as JSON') 213 214 ntf_group = parser.add_argument_group('Notifications') 215 ntf_group.add_argument('--subscribe', dest='ntf', type=str) 216 ntf_group.add_argument('--duration', dest='duration', type=int, 217 help='when subscribed, watch for DURATION seconds') 218 ntf_group.add_argument('--sleep', dest='duration', type=int, 219 help='alias for duration') 220 221 nlflags = parser.add_argument_group('Netlink message flags (NLM_F_*)', 222 ('Extra flags to set in nlmsg_flags of ' 223 'the request, used mostly by older ' 224 'Classic Netlink families.')) 225 nlflags.add_argument('--replace', dest='flags', action='append_const', 226 const=Netlink.NLM_F_REPLACE) 227 nlflags.add_argument('--excl', dest='flags', action='append_const', 228 const=Netlink.NLM_F_EXCL) 229 nlflags.add_argument('--create', dest='flags', action='append_const', 230 const=Netlink.NLM_F_CREATE) 231 nlflags.add_argument('--append', dest='flags', action='append_const', 232 const=Netlink.NLM_F_APPEND) 233 234 schema_group = parser.add_argument_group('Development options') 235 schema_group.add_argument('--schema', dest='schema', type=str, 236 help="JSON schema to validate the spec") 237 schema_group.add_argument('--no-schema', action='store_true') 238 239 dbg_group = parser.add_argument_group('Debug options') 240 dbg_group.add_argument('--dbg-small-recv', default=0, const=4000, 241 action='store', nargs='?', type=int, metavar='INT', 242 help="Length of buffers used for recv()") 243 dbg_group.add_argument('--process-unknown', action=argparse.BooleanOptionalAction) 244 245 args = parser.parse_args() 246 247 def output(msg): 248 if args.output_json: 249 print(json.dumps(msg, cls=YnlEncoder)) 250 else: 251 pprint.PrettyPrinter().pprint(msg) 252 253 if args.list_families: 254 for filename in sorted(os.listdir(spec_dir())): 255 if filename.endswith('.yaml'): 256 print(filename.removesuffix('.yaml')) 257 return 258 259 if args.no_schema: 260 args.schema = '' 261 262 attrs = {} 263 if args.json_text: 264 attrs = json.loads(args.json_text) 265 266 if args.family: 267 spec = f"{spec_dir()}/{args.family}.yaml" 268 else: 269 spec = args.spec 270 if not os.path.isfile(spec): 271 raise YnlException(f"Spec file {spec} does not exist") 272 273 if args.validate: 274 try: 275 SpecFamily(spec, args.schema) 276 except SpecException as error: 277 print(error) 278 sys.exit(1) 279 return 280 281 if args.family: # set behaviour when using installed specs 282 if args.schema is None and spec.startswith(SYS_SCHEMA_DIR): 283 args.schema = '' # disable schema validation when installed 284 if args.process_unknown is None: 285 args.process_unknown = True 286 287 ynl = YnlFamily(spec, args.schema, args.process_unknown, 288 recv_size=args.dbg_small_recv) 289 if args.dbg_small_recv: 290 ynl.set_recv_dbg(True) 291 292 if args.ntf: 293 ynl.ntf_subscribe(args.ntf) 294 295 if args.list_ops: 296 for op_name, op in ynl.ops.items(): 297 print(op_name, " [", ", ".join(op.modes), "]") 298 if args.list_msgs: 299 for op_name, op in ynl.msgs.items(): 300 print(op_name, " [", ", ".join(op.modes), "]") 301 302 if args.list_attrs: 303 op = ynl.msgs.get(args.list_attrs) 304 if not op: 305 print(f'Operation {args.list_attrs} not found') 306 sys.exit(1) 307 308 do_doc(ynl, op) 309 310 try: 311 if args.do: 312 reply = ynl.do(args.do, attrs, args.flags) 313 output(reply) 314 if args.dump: 315 reply = ynl.dump(args.dump, attrs) 316 output(reply) 317 if args.multi: 318 ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ] 319 reply = ynl.do_multi(ops) 320 output(reply) 321 322 if args.ntf: 323 for msg in ynl.poll_ntf(duration=args.duration): 324 output(msg) 325 except NlError as e: 326 print(e) 327 sys.exit(1) 328 except KeyboardInterrupt: 329 pass 330 except BrokenPipeError: 331 pass 332 333 334if __name__ == "__main__": 335 main() 336