xref: /linux/tools/net/ynl/pyynl/cli.py (revision 37a93dd5c49b5fda807fd204edf2547c3493319c)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3
4"""
5YNL cli tool
6"""
7
8import argparse
9import json
10import os
11import pathlib
12import pprint
13import shutil
14import sys
15import textwrap
16
17# pylint: disable=no-name-in-module,wrong-import-position
18sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
19from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException
20
21SYS_SCHEMA_DIR='/usr/share/ynl'
22RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink'
23
24# pylint: disable=too-few-public-methods,too-many-locals
25class Colors:
26    """ANSI color and font modifier codes"""
27    RESET = '\033[0m'
28
29    BOLD = '\033[1m'
30    ITALICS = '\033[3m'
31    UNDERLINE = '\033[4m'
32    INVERT = '\033[7m'
33
34
35def color(text, modifiers):
36    """Add color to text if output is a TTY
37
38    Returns:
39        Colored text if stdout is a TTY, otherwise plain text
40    """
41    if sys.stdout.isatty():
42        # Join the colors if they are a list, if it's a string this a noop
43        modifiers = "".join(modifiers)
44        return f"{modifiers}{text}{Colors.RESET}"
45    return text
46
47def term_width():
48    """ Get terminal width in columns (80 if stdout is not a terminal) """
49    return shutil.get_terminal_size().columns
50
51def schema_dir():
52    """
53    Return the effective schema directory, preferring in-tree before
54    system schema directory.
55    """
56    script_dir = os.path.dirname(os.path.abspath(__file__))
57    schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}")
58    if not os.path.isdir(schema_dir_):
59        schema_dir_ = SYS_SCHEMA_DIR
60    if not os.path.isdir(schema_dir_):
61        raise YnlException(f"Schema directory {schema_dir_} does not exist")
62    return schema_dir_
63
64def spec_dir():
65    """
66    Return the effective spec directory, relative to the effective
67    schema directory.
68    """
69    spec_dir_ = schema_dir() + '/specs'
70    if not os.path.isdir(spec_dir_):
71        raise YnlException(f"Spec directory {spec_dir_} does not exist")
72    return spec_dir_
73
74
75class YnlEncoder(json.JSONEncoder):
76    """A custom encoder for emitting JSON with ynl-specific instance types"""
77    def default(self, o):
78        if isinstance(o, bytes):
79            return bytes.hex(o)
80        if isinstance(o, set):
81            return list(o)
82        return json.JSONEncoder.default(self, o)
83
84
85def print_attr_list(ynl, attr_names, attr_set, indent=2):
86    """Print a list of attributes with their types and documentation."""
87    prefix = ' ' * indent
88    for attr_name in attr_names:
89        if attr_name in attr_set.attrs:
90            attr = attr_set.attrs[attr_name]
91            attr_info = f'{prefix}- {color(attr_name, Colors.BOLD)}: {attr.type}'
92            if 'enum' in attr.yaml:
93                enum_name = attr.yaml['enum']
94                attr_info += f" (enum: {enum_name})"
95                # Print enum values if available
96                if enum_name in ynl.consts:
97                    const = ynl.consts[enum_name]
98                    enum_values = list(const.entries.keys())
99                    type_fmted = color(const.type.capitalize(), Colors.ITALICS)
100                    attr_info += f"\n{prefix}  {type_fmted}: {', '.join(enum_values)}"
101
102            # Show nested attributes reference and recursively display them
103            nested_set_name = None
104            if attr.type == 'nest' and 'nested-attributes' in attr.yaml:
105                nested_set_name = attr.yaml['nested-attributes']
106                attr_info += f" -> {nested_set_name}"
107
108            if attr.yaml.get('doc'):
109                doc_prefix = prefix + ' ' * 4
110                doc_text = textwrap.fill(attr.yaml['doc'], width=term_width(),
111                                         initial_indent=doc_prefix,
112                                         subsequent_indent=doc_prefix)
113                attr_info += f"\n{doc_text}"
114            print(attr_info)
115
116            # Recursively show nested attributes
117            if nested_set_name in ynl.attr_sets:
118                nested_set = ynl.attr_sets[nested_set_name]
119                # Filter out 'unspec' and other unused attrs
120                nested_names = [n for n in nested_set.attrs.keys()
121                                if nested_set.attrs[n].type != 'unused']
122                if nested_names:
123                    print_attr_list(ynl, nested_names, nested_set, indent + 4)
124
125
126def print_mode_attrs(ynl, mode, mode_spec, attr_set, consistent_dd_reply=None):
127    """Print a given mode (do/dump/event/notify)."""
128    mode_title = mode.capitalize()
129
130    if 'request' in mode_spec and 'attributes' in mode_spec['request']:
131        print(f'\n{mode_title} request attributes:')
132        print_attr_list(ynl, mode_spec['request']['attributes'], attr_set)
133
134    if 'reply' in mode_spec and 'attributes' in mode_spec['reply']:
135        if consistent_dd_reply and mode == "do":
136            title = None  # Dump handling will print in combined format
137        elif consistent_dd_reply and mode == "dump":
138            title = 'Do and Dump'
139        else:
140            title = f'{mode_title}'
141        if title:
142            print(f'\n{title} reply attributes:')
143            print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set)
144
145
146def do_doc(ynl, op):
147    """Handle --list-attrs $op, print the attr information to stdout"""
148    print(f'Operation: {color(op.name, Colors.BOLD)}')
149    print(op.yaml['doc'])
150
151    consistent_dd_reply = False
152    if 'do' in op.yaml and 'dump' in op.yaml and 'reply' in op.yaml['do'] and \
153       op.yaml['do']['reply'] == op.yaml['dump'].get('reply'):
154        consistent_dd_reply = True
155
156    for mode in ['do', 'dump']:
157        if mode in op.yaml:
158            print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set,
159                             consistent_dd_reply=consistent_dd_reply)
160
161    if 'attributes' in op.yaml.get('event', {}):
162        print('\nEvent attributes:')
163        print_attr_list(ynl, op.yaml['event']['attributes'], op.attr_set)
164
165    if 'notify' in op.yaml:
166        mode_spec = op.yaml['notify']
167        ref_spec = ynl.msgs.get(mode_spec).yaml.get('do')
168        if not ref_spec:
169            ref_spec = ynl.msgs.get(mode_spec).yaml.get('dump')
170        if ref_spec:
171            print('\nNotification attributes:')
172            print_attr_list(ynl, ref_spec['reply']['attributes'], op.attr_set)
173
174    if 'mcgrp' in op.yaml:
175        print(f"\nMulticast group: {op.yaml['mcgrp']}")
176
177
178# pylint: disable=too-many-locals,too-many-branches,too-many-statements
179def main():
180    """YNL cli tool"""
181
182    description = """
183    YNL CLI utility - a general purpose netlink utility that uses YAML
184    specs to drive protocol encoding and decoding.
185    """
186    epilog = """
187    The --multi option can be repeated to include several do operations
188    in the same netlink payload.
189    """
190
191    parser = argparse.ArgumentParser(description=description,
192                                     epilog=epilog, add_help=False)
193
194    gen_group = parser.add_argument_group('General options')
195    gen_group.add_argument('-h', '--help', action='help',
196                           help='show this help message and exit')
197
198    spec_group = parser.add_argument_group('Netlink family selection')
199    spec_sel = spec_group.add_mutually_exclusive_group(required=True)
200    spec_sel.add_argument('--list-families', action='store_true',
201                          help=('list Netlink families supported by YNL '
202                                '(which have a spec available in the standard '
203                                'system path)'))
204    spec_sel.add_argument('--family', dest='family', type=str,
205                          help='name of the Netlink FAMILY to use')
206    spec_sel.add_argument('--spec', dest='spec', type=str,
207                          help='full file path to the YAML spec file')
208
209    ops_group = parser.add_argument_group('Operations')
210    ops = ops_group.add_mutually_exclusive_group()
211    ops.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str)
212    ops.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str)
213    ops.add_argument('--multi', dest='multi', nargs=2, action='append',
214                     metavar=('DO-OPERATION', 'JSON_TEXT'), type=str,
215                     help="Multi-message operation sequence (for nftables)")
216    ops.add_argument('--list-ops', action='store_true',
217                     help="List available --do and --dump operations")
218    ops.add_argument('--list-msgs', action='store_true',
219                     help="List all messages of the family (incl. notifications)")
220    ops.add_argument('--list-attrs', '--doc', dest='list_attrs', metavar='MSG',
221                     type=str, help='List attributes for a message / operation')
222    ops.add_argument('--validate', action='store_true',
223                     help="Validate the spec against schema and exit")
224
225    io_group = parser.add_argument_group('Input / Output')
226    io_group.add_argument('--json', dest='json_text', type=str,
227                          help=('Specify attributes of the message to send '
228                                'to the kernel in JSON format. Can be left out '
229                                'if the message is expected to be empty.'))
230    io_group.add_argument('--output-json', action='store_true',
231                          help='Format output as JSON')
232
233    ntf_group = parser.add_argument_group('Notifications')
234    ntf_group.add_argument('--subscribe', dest='ntf', type=str)
235    ntf_group.add_argument('--duration', dest='duration', type=int,
236                           help='when subscribed, watch for DURATION seconds')
237    ntf_group.add_argument('--sleep', dest='duration', type=int,
238                           help='alias for duration')
239
240    nlflags = parser.add_argument_group('Netlink message flags (NLM_F_*)',
241                                        ('Extra flags to set in nlmsg_flags of '
242                                         'the request, used mostly by older '
243                                         'Classic Netlink families.'))
244    nlflags.add_argument('--replace', dest='flags', action='append_const',
245                         const=Netlink.NLM_F_REPLACE)
246    nlflags.add_argument('--excl', dest='flags', action='append_const',
247                         const=Netlink.NLM_F_EXCL)
248    nlflags.add_argument('--create', dest='flags', action='append_const',
249                         const=Netlink.NLM_F_CREATE)
250    nlflags.add_argument('--append', dest='flags', action='append_const',
251                         const=Netlink.NLM_F_APPEND)
252
253    schema_group = parser.add_argument_group('Development options')
254    schema_group.add_argument('--schema', dest='schema', type=str,
255                              help="JSON schema to validate the spec")
256    schema_group.add_argument('--no-schema', action='store_true')
257
258    dbg_group = parser.add_argument_group('Debug options')
259    dbg_group.add_argument('--dbg-small-recv', default=0, const=4000,
260                           action='store', nargs='?', type=int, metavar='INT',
261                           help="Length of buffers used for recv()")
262    dbg_group.add_argument('--process-unknown', action=argparse.BooleanOptionalAction)
263
264    args = parser.parse_args()
265
266    def output(msg):
267        if args.output_json:
268            print(json.dumps(msg, cls=YnlEncoder))
269        else:
270            pprint.pprint(msg, width=term_width(), compact=True)
271
272    if args.list_families:
273        for filename in sorted(os.listdir(spec_dir())):
274            if filename.endswith('.yaml'):
275                print(filename.removesuffix('.yaml'))
276        return
277
278    if args.no_schema:
279        args.schema = ''
280
281    attrs = {}
282    if args.json_text:
283        attrs = json.loads(args.json_text)
284
285    if args.family:
286        spec = f"{spec_dir()}/{args.family}.yaml"
287    else:
288        spec = args.spec
289    if not os.path.isfile(spec):
290        raise YnlException(f"Spec file {spec} does not exist")
291
292    if args.validate:
293        try:
294            SpecFamily(spec, args.schema)
295        except SpecException as error:
296            print(error)
297            sys.exit(1)
298        return
299
300    if args.family: # set behaviour when using installed specs
301        if args.schema is None and spec.startswith(SYS_SCHEMA_DIR):
302            args.schema = '' # disable schema validation when installed
303        if args.process_unknown is None:
304            args.process_unknown = True
305
306    ynl = YnlFamily(spec, args.schema, args.process_unknown,
307                    recv_size=args.dbg_small_recv)
308    if args.dbg_small_recv:
309        ynl.set_recv_dbg(True)
310
311    if args.ntf:
312        ynl.ntf_subscribe(args.ntf)
313
314    if args.list_ops:
315        for op_name, op in ynl.ops.items():
316            print(op_name, " [", ", ".join(op.modes), "]")
317    if args.list_msgs:
318        for op_name, op in ynl.msgs.items():
319            print(op_name, " [", ", ".join(op.modes), "]")
320
321    if args.list_attrs:
322        op = ynl.msgs.get(args.list_attrs)
323        if not op:
324            print(f'Operation {args.list_attrs} not found')
325            sys.exit(1)
326
327        do_doc(ynl, op)
328
329    try:
330        if args.do:
331            reply = ynl.do(args.do, attrs, args.flags)
332            output(reply)
333        if args.dump:
334            reply = ynl.dump(args.dump, attrs)
335            output(reply)
336        if args.multi:
337            ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ]
338            reply = ynl.do_multi(ops)
339            output(reply)
340
341        if args.ntf:
342            for msg in ynl.poll_ntf(duration=args.duration):
343                output(msg)
344    except NlError as e:
345        print(e)
346        sys.exit(1)
347    except KeyboardInterrupt:
348        pass
349    except BrokenPipeError:
350        pass
351
352
353if __name__ == "__main__":
354    main()
355