xref: /linux/tools/net/ynl/pyynl/cli.py (revision d8f87aa5fa0a4276491fa8ef436cd22605a3f9ba)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3
4"""
5YNL cli tool
6"""
7
8import argparse
9import json
10import os
11import pathlib
12import pprint
13import shutil
14import sys
15import textwrap
16
17# pylint: disable=no-name-in-module,wrong-import-position
18sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
19from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException
20
21SYS_SCHEMA_DIR='/usr/share/ynl'
22RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink'
23
24# pylint: disable=too-few-public-methods,too-many-locals
25class Colors:
26    """ANSI color and font modifier codes"""
27    RESET = '\033[0m'
28
29    BOLD = '\033[1m'
30    ITALICS = '\033[3m'
31    UNDERLINE = '\033[4m'
32    INVERT = '\033[7m'
33
34
35def color(text, modifiers):
36    """Add color to text if output is a TTY
37
38    Returns:
39        Colored text if stdout is a TTY, otherwise plain text
40    """
41    if sys.stdout.isatty():
42        # Join the colors if they are a list, if it's a string this a noop
43        modifiers = "".join(modifiers)
44        return f"{modifiers}{text}{Colors.RESET}"
45    return text
46
47def schema_dir():
48    """
49    Return the effective schema directory, preferring in-tree before
50    system schema directory.
51    """
52    script_dir = os.path.dirname(os.path.abspath(__file__))
53    schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}")
54    if not os.path.isdir(schema_dir_):
55        schema_dir_ = SYS_SCHEMA_DIR
56    if not os.path.isdir(schema_dir_):
57        raise YnlException(f"Schema directory {schema_dir_} does not exist")
58    return schema_dir_
59
60def spec_dir():
61    """
62    Return the effective spec directory, relative to the effective
63    schema directory.
64    """
65    spec_dir_ = schema_dir() + '/specs'
66    if not os.path.isdir(spec_dir_):
67        raise YnlException(f"Spec directory {spec_dir_} does not exist")
68    return spec_dir_
69
70
71class YnlEncoder(json.JSONEncoder):
72    """A custom encoder for emitting JSON with ynl-specific instance types"""
73    def default(self, o):
74        if isinstance(o, bytes):
75            return bytes.hex(o)
76        if isinstance(o, set):
77            return list(o)
78        return json.JSONEncoder.default(self, o)
79
80
81def print_attr_list(ynl, attr_names, attr_set, indent=2):
82    """Print a list of attributes with their types and documentation."""
83    prefix = ' ' * indent
84    for attr_name in attr_names:
85        if attr_name in attr_set.attrs:
86            attr = attr_set.attrs[attr_name]
87            attr_info = f'{prefix}- {color(attr_name, Colors.BOLD)}: {attr.type}'
88            if 'enum' in attr.yaml:
89                enum_name = attr.yaml['enum']
90                attr_info += f" (enum: {enum_name})"
91                # Print enum values if available
92                if enum_name in ynl.consts:
93                    const = ynl.consts[enum_name]
94                    enum_values = list(const.entries.keys())
95                    type_fmted = color(const.type.capitalize(), Colors.ITALICS)
96                    attr_info += f"\n{prefix}  {type_fmted}: {', '.join(enum_values)}"
97
98            # Show nested attributes reference and recursively display them
99            nested_set_name = None
100            if attr.type == 'nest' and 'nested-attributes' in attr.yaml:
101                nested_set_name = attr.yaml['nested-attributes']
102                attr_info += f" -> {nested_set_name}"
103
104            if attr.yaml.get('doc'):
105                doc_prefix = prefix + ' ' * 4
106                term_width = shutil.get_terminal_size().columns
107                doc_text = textwrap.fill(attr.yaml['doc'], width=term_width,
108                                         initial_indent=doc_prefix,
109                                         subsequent_indent=doc_prefix)
110                attr_info += f"\n{doc_text}"
111            print(attr_info)
112
113            # Recursively show nested attributes
114            if nested_set_name in ynl.attr_sets:
115                nested_set = ynl.attr_sets[nested_set_name]
116                # Filter out 'unspec' and other unused attrs
117                nested_names = [n for n in nested_set.attrs.keys()
118                                if nested_set.attrs[n].type != 'unused']
119                if nested_names:
120                    print_attr_list(ynl, nested_names, nested_set, indent + 4)
121
122
123def print_mode_attrs(ynl, mode, mode_spec, attr_set, consistent_dd_reply=None):
124    """Print a given mode (do/dump/event/notify)."""
125    mode_title = mode.capitalize()
126
127    if 'request' in mode_spec and 'attributes' in mode_spec['request']:
128        print(f'\n{mode_title} request attributes:')
129        print_attr_list(ynl, mode_spec['request']['attributes'], attr_set)
130
131    if 'reply' in mode_spec and 'attributes' in mode_spec['reply']:
132        if consistent_dd_reply and mode == "do":
133            title = None  # Dump handling will print in combined format
134        elif consistent_dd_reply and mode == "dump":
135            title = 'Do and Dump'
136        else:
137            title = f'{mode_title}'
138        if title:
139            print(f'\n{title} reply attributes:')
140            print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set)
141
142
143def do_doc(ynl, op):
144    """Handle --list-attrs $op, print the attr information to stdout"""
145    print(f'Operation: {color(op.name, Colors.BOLD)}')
146    print(op.yaml['doc'])
147
148    consistent_dd_reply = False
149    if 'do' in op.yaml and 'dump' in op.yaml and 'reply' in op.yaml['do'] and \
150       op.yaml['do']['reply'] == op.yaml['dump'].get('reply'):
151        consistent_dd_reply = True
152
153    for mode in ['do', 'dump']:
154        if mode in op.yaml:
155            print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set,
156                             consistent_dd_reply=consistent_dd_reply)
157
158    if 'attributes' in op.yaml.get('event', {}):
159        print('\nEvent attributes:')
160        print_attr_list(ynl, op.yaml['event']['attributes'], op.attr_set)
161
162    if 'notify' in op.yaml:
163        mode_spec = op.yaml['notify']
164        ref_spec = ynl.msgs.get(mode_spec).yaml.get('do')
165        if not ref_spec:
166            ref_spec = ynl.msgs.get(mode_spec).yaml.get('dump')
167        if ref_spec:
168            print('\nNotification attributes:')
169            print_attr_list(ynl, ref_spec['reply']['attributes'], op.attr_set)
170
171    if 'mcgrp' in op.yaml:
172        print(f"\nMulticast group: {op.yaml['mcgrp']}")
173
174
175# pylint: disable=too-many-locals,too-many-branches,too-many-statements
176def main():
177    """YNL cli tool"""
178
179    description = """
180    YNL CLI utility - a general purpose netlink utility that uses YAML
181    specs to drive protocol encoding and decoding.
182    """
183    epilog = """
184    The --multi option can be repeated to include several do operations
185    in the same netlink payload.
186    """
187
188    parser = argparse.ArgumentParser(description=description,
189                                     epilog=epilog, add_help=False)
190
191    gen_group = parser.add_argument_group('General options')
192    gen_group.add_argument('-h', '--help', action='help',
193                           help='show this help message and exit')
194
195    spec_group = parser.add_argument_group('Netlink family selection')
196    spec_sel = spec_group.add_mutually_exclusive_group(required=True)
197    spec_sel.add_argument('--list-families', action='store_true',
198                          help=('list Netlink families supported by YNL '
199                                '(which have a spec available in the standard '
200                                'system path)'))
201    spec_sel.add_argument('--family', dest='family', type=str,
202                          help='name of the Netlink FAMILY to use')
203    spec_sel.add_argument('--spec', dest='spec', type=str,
204                          help='full file path to the YAML spec file')
205
206    ops_group = parser.add_argument_group('Operations')
207    ops = ops_group.add_mutually_exclusive_group()
208    ops.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str)
209    ops.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str)
210    ops.add_argument('--multi', dest='multi', nargs=2, action='append',
211                     metavar=('DO-OPERATION', 'JSON_TEXT'), type=str,
212                     help="Multi-message operation sequence (for nftables)")
213    ops.add_argument('--list-ops', action='store_true',
214                     help="List available --do and --dump operations")
215    ops.add_argument('--list-msgs', action='store_true',
216                     help="List all messages of the family (incl. notifications)")
217    ops.add_argument('--list-attrs', '--doc', dest='list_attrs', metavar='MSG',
218                     type=str, help='List attributes for a message / operation')
219    ops.add_argument('--validate', action='store_true',
220                     help="Validate the spec against schema and exit")
221
222    io_group = parser.add_argument_group('Input / Output')
223    io_group.add_argument('--json', dest='json_text', type=str,
224                          help=('Specify attributes of the message to send '
225                                'to the kernel in JSON format. Can be left out '
226                                'if the message is expected to be empty.'))
227    io_group.add_argument('--output-json', action='store_true',
228                          help='Format output as JSON')
229
230    ntf_group = parser.add_argument_group('Notifications')
231    ntf_group.add_argument('--subscribe', dest='ntf', type=str)
232    ntf_group.add_argument('--duration', dest='duration', type=int,
233                           help='when subscribed, watch for DURATION seconds')
234    ntf_group.add_argument('--sleep', dest='duration', type=int,
235                           help='alias for duration')
236
237    nlflags = parser.add_argument_group('Netlink message flags (NLM_F_*)',
238                                        ('Extra flags to set in nlmsg_flags of '
239                                         'the request, used mostly by older '
240                                         'Classic Netlink families.'))
241    nlflags.add_argument('--replace', dest='flags', action='append_const',
242                         const=Netlink.NLM_F_REPLACE)
243    nlflags.add_argument('--excl', dest='flags', action='append_const',
244                         const=Netlink.NLM_F_EXCL)
245    nlflags.add_argument('--create', dest='flags', action='append_const',
246                         const=Netlink.NLM_F_CREATE)
247    nlflags.add_argument('--append', dest='flags', action='append_const',
248                         const=Netlink.NLM_F_APPEND)
249
250    schema_group = parser.add_argument_group('Development options')
251    schema_group.add_argument('--schema', dest='schema', type=str,
252                              help="JSON schema to validate the spec")
253    schema_group.add_argument('--no-schema', action='store_true')
254
255    dbg_group = parser.add_argument_group('Debug options')
256    dbg_group.add_argument('--dbg-small-recv', default=0, const=4000,
257                           action='store', nargs='?', type=int, metavar='INT',
258                           help="Length of buffers used for recv()")
259    dbg_group.add_argument('--process-unknown', action=argparse.BooleanOptionalAction)
260
261    args = parser.parse_args()
262
263    def output(msg):
264        if args.output_json:
265            print(json.dumps(msg, cls=YnlEncoder))
266        else:
267            pprint.PrettyPrinter().pprint(msg)
268
269    if args.list_families:
270        for filename in sorted(os.listdir(spec_dir())):
271            if filename.endswith('.yaml'):
272                print(filename.removesuffix('.yaml'))
273        return
274
275    if args.no_schema:
276        args.schema = ''
277
278    attrs = {}
279    if args.json_text:
280        attrs = json.loads(args.json_text)
281
282    if args.family:
283        spec = f"{spec_dir()}/{args.family}.yaml"
284    else:
285        spec = args.spec
286    if not os.path.isfile(spec):
287        raise YnlException(f"Spec file {spec} does not exist")
288
289    if args.validate:
290        try:
291            SpecFamily(spec, args.schema)
292        except SpecException as error:
293            print(error)
294            sys.exit(1)
295        return
296
297    if args.family: # set behaviour when using installed specs
298        if args.schema is None and spec.startswith(SYS_SCHEMA_DIR):
299            args.schema = '' # disable schema validation when installed
300        if args.process_unknown is None:
301            args.process_unknown = True
302
303    ynl = YnlFamily(spec, args.schema, args.process_unknown,
304                    recv_size=args.dbg_small_recv)
305    if args.dbg_small_recv:
306        ynl.set_recv_dbg(True)
307
308    if args.ntf:
309        ynl.ntf_subscribe(args.ntf)
310
311    if args.list_ops:
312        for op_name, op in ynl.ops.items():
313            print(op_name, " [", ", ".join(op.modes), "]")
314    if args.list_msgs:
315        for op_name, op in ynl.msgs.items():
316            print(op_name, " [", ", ".join(op.modes), "]")
317
318    if args.list_attrs:
319        op = ynl.msgs.get(args.list_attrs)
320        if not op:
321            print(f'Operation {args.list_attrs} not found')
322            sys.exit(1)
323
324        do_doc(ynl, op)
325
326    try:
327        if args.do:
328            reply = ynl.do(args.do, attrs, args.flags)
329            output(reply)
330        if args.dump:
331            reply = ynl.dump(args.dump, attrs)
332            output(reply)
333        if args.multi:
334            ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ]
335            reply = ynl.do_multi(ops)
336            output(reply)
337
338        if args.ntf:
339            for msg in ynl.poll_ntf(duration=args.duration):
340                output(msg)
341    except NlError as e:
342        print(e)
343        sys.exit(1)
344    except KeyboardInterrupt:
345        pass
346    except BrokenPipeError:
347        pass
348
349
350if __name__ == "__main__":
351    main()
352