xref: /linux/tools/net/ynl/pyynl/cli.py (revision 45b99bb464eb62da555ecbef31583d9701881d43)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3
4"""
5YNL cli tool
6"""
7
8import argparse
9import json
10import os
11import pathlib
12import pprint
13import shutil
14import sys
15import textwrap
16
17# pylint: disable=no-name-in-module,wrong-import-position
18sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
19from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException
20
21SYS_SCHEMA_DIR='/usr/share/ynl'
22RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink'
23
24# pylint: disable=too-few-public-methods,too-many-locals
25class Colors:
26    """ANSI color and font modifier codes"""
27    RESET = '\033[0m'
28
29    BOLD = '\033[1m'
30    ITALICS = '\033[3m'
31    UNDERLINE = '\033[4m'
32    INVERT = '\033[7m'
33
34
35def color(text, modifiers):
36    """Add color to text if output is a TTY
37
38    Returns:
39        Colored text if stdout is a TTY, otherwise plain text
40    """
41    if sys.stdout.isatty():
42        # Join the colors if they are a list, if it's a string this a noop
43        modifiers = "".join(modifiers)
44        return f"{modifiers}{text}{Colors.RESET}"
45    return text
46
47def schema_dir():
48    """
49    Return the effective schema directory, preferring in-tree before
50    system schema directory.
51    """
52    script_dir = os.path.dirname(os.path.abspath(__file__))
53    schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}")
54    if not os.path.isdir(schema_dir_):
55        schema_dir_ = SYS_SCHEMA_DIR
56    if not os.path.isdir(schema_dir_):
57        raise YnlException(f"Schema directory {schema_dir_} does not exist")
58    return schema_dir_
59
60def spec_dir():
61    """
62    Return the effective spec directory, relative to the effective
63    schema directory.
64    """
65    spec_dir_ = schema_dir() + '/specs'
66    if not os.path.isdir(spec_dir_):
67        raise YnlException(f"Spec directory {spec_dir_} does not exist")
68    return spec_dir_
69
70
71class YnlEncoder(json.JSONEncoder):
72    """A custom encoder for emitting JSON with ynl-specific instance types"""
73    def default(self, o):
74        if isinstance(o, bytes):
75            return bytes.hex(o)
76        if isinstance(o, set):
77            return list(o)
78        return json.JSONEncoder.default(self, o)
79
80
81def print_attr_list(ynl, attr_names, attr_set, indent=2):
82    """Print a list of attributes with their types and documentation."""
83    prefix = ' ' * indent
84    for attr_name in attr_names:
85        if attr_name in attr_set.attrs:
86            attr = attr_set.attrs[attr_name]
87            attr_info = f'{prefix}- {color(attr_name, Colors.BOLD)}: {attr.type}'
88            if 'enum' in attr.yaml:
89                enum_name = attr.yaml['enum']
90                attr_info += f" (enum: {enum_name})"
91                # Print enum values if available
92                if enum_name in ynl.consts:
93                    const = ynl.consts[enum_name]
94                    enum_values = list(const.entries.keys())
95                    type_fmted = color(const.type.capitalize(), Colors.ITALICS)
96                    attr_info += f"\n{prefix}  {type_fmted}: {', '.join(enum_values)}"
97
98            # Show nested attributes reference and recursively display them
99            nested_set_name = None
100            if attr.type == 'nest' and 'nested-attributes' in attr.yaml:
101                nested_set_name = attr.yaml['nested-attributes']
102                attr_info += f" -> {nested_set_name}"
103
104            if attr.yaml.get('doc'):
105                doc_prefix = prefix + ' ' * 4
106                term_width = shutil.get_terminal_size().columns
107                doc_text = textwrap.fill(attr.yaml['doc'], width=term_width,
108                                         initial_indent=doc_prefix,
109                                         subsequent_indent=doc_prefix)
110                attr_info += f"\n{doc_text}"
111            print(attr_info)
112
113            # Recursively show nested attributes
114            if nested_set_name in ynl.attr_sets:
115                nested_set = ynl.attr_sets[nested_set_name]
116                # Filter out 'unspec' and other unused attrs
117                nested_names = [n for n in nested_set.attrs.keys()
118                                if nested_set.attrs[n].type != 'unused']
119                if nested_names:
120                    print_attr_list(ynl, nested_names, nested_set, indent + 4)
121
122
123def print_mode_attrs(ynl, mode, mode_spec, attr_set, print_request=True):
124    """Print a given mode (do/dump/event/notify)."""
125    mode_title = mode.capitalize()
126
127    if print_request and 'request' in mode_spec and 'attributes' in mode_spec['request']:
128        print(f'\n{mode_title} request attributes:')
129        print_attr_list(ynl, mode_spec['request']['attributes'], attr_set)
130
131    if 'reply' in mode_spec and 'attributes' in mode_spec['reply']:
132        print(f'\n{mode_title} reply attributes:')
133        print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set)
134
135    if 'attributes' in mode_spec:
136        print(f'\n{mode_title} attributes:')
137        print_attr_list(ynl, mode_spec['attributes'], attr_set)
138
139
140def do_doc(ynl, op):
141    """Handle --list-attrs $op, print the attr information to stdout"""
142    print(f'Operation: {color(op.name, Colors.BOLD)}')
143    print(op.yaml['doc'])
144
145    for mode in ['do', 'dump', 'event']:
146        if mode in op.yaml:
147            print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set, True)
148
149    if 'notify' in op.yaml:
150        mode_spec = op.yaml['notify']
151        ref_spec = ynl.msgs.get(mode_spec).yaml.get('do')
152        if ref_spec:
153            print_mode_attrs(ynl, 'notify', ref_spec, op.attr_set, False)
154
155    if 'mcgrp' in op.yaml:
156        print(f"\nMulticast group: {op.yaml['mcgrp']}")
157
158
159# pylint: disable=too-many-locals,too-many-branches,too-many-statements
160def main():
161    """YNL cli tool"""
162
163    description = """
164    YNL CLI utility - a general purpose netlink utility that uses YAML
165    specs to drive protocol encoding and decoding.
166    """
167    epilog = """
168    The --multi option can be repeated to include several do operations
169    in the same netlink payload.
170    """
171
172    parser = argparse.ArgumentParser(description=description,
173                                     epilog=epilog, add_help=False)
174
175    gen_group = parser.add_argument_group('General options')
176    gen_group.add_argument('-h', '--help', action='help',
177                           help='show this help message and exit')
178
179    spec_group = parser.add_argument_group('Netlink family selection')
180    spec_sel = spec_group.add_mutually_exclusive_group(required=True)
181    spec_sel.add_argument('--list-families', action='store_true',
182                          help=('list Netlink families supported by YNL '
183                                '(which have a spec available in the standard '
184                                'system path)'))
185    spec_sel.add_argument('--family', dest='family', type=str,
186                          help='name of the Netlink FAMILY to use')
187    spec_sel.add_argument('--spec', dest='spec', type=str,
188                          help='full file path to the YAML spec file')
189
190    ops_group = parser.add_argument_group('Operations')
191    ops = ops_group.add_mutually_exclusive_group()
192    ops.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str)
193    ops.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str)
194    ops.add_argument('--multi', dest='multi', nargs=2, action='append',
195                     metavar=('DO-OPERATION', 'JSON_TEXT'), type=str,
196                     help="Multi-message operation sequence (for nftables)")
197    ops.add_argument('--list-ops', action='store_true',
198                     help="List available --do and --dump operations")
199    ops.add_argument('--list-msgs', action='store_true',
200                     help="List all messages of the family (incl. notifications)")
201    ops.add_argument('--list-attrs', '--doc', dest='list_attrs', metavar='MSG',
202                     type=str, help='List attributes for a message / operation')
203    ops.add_argument('--validate', action='store_true',
204                     help="Validate the spec against schema and exit")
205
206    io_group = parser.add_argument_group('Input / Output')
207    io_group.add_argument('--json', dest='json_text', type=str,
208                          help=('Specify attributes of the message to send '
209                                'to the kernel in JSON format. Can be left out '
210                                'if the message is expected to be empty.'))
211    io_group.add_argument('--output-json', action='store_true',
212                          help='Format output as JSON')
213
214    ntf_group = parser.add_argument_group('Notifications')
215    ntf_group.add_argument('--subscribe', dest='ntf', type=str)
216    ntf_group.add_argument('--duration', dest='duration', type=int,
217                           help='when subscribed, watch for DURATION seconds')
218    ntf_group.add_argument('--sleep', dest='duration', type=int,
219                           help='alias for duration')
220
221    nlflags = parser.add_argument_group('Netlink message flags (NLM_F_*)',
222                                        ('Extra flags to set in nlmsg_flags of '
223                                         'the request, used mostly by older '
224                                         'Classic Netlink families.'))
225    nlflags.add_argument('--replace', dest='flags', action='append_const',
226                         const=Netlink.NLM_F_REPLACE)
227    nlflags.add_argument('--excl', dest='flags', action='append_const',
228                         const=Netlink.NLM_F_EXCL)
229    nlflags.add_argument('--create', dest='flags', action='append_const',
230                         const=Netlink.NLM_F_CREATE)
231    nlflags.add_argument('--append', dest='flags', action='append_const',
232                         const=Netlink.NLM_F_APPEND)
233
234    schema_group = parser.add_argument_group('Development options')
235    schema_group.add_argument('--schema', dest='schema', type=str,
236                              help="JSON schema to validate the spec")
237    schema_group.add_argument('--no-schema', action='store_true')
238
239    dbg_group = parser.add_argument_group('Debug options')
240    dbg_group.add_argument('--dbg-small-recv', default=0, const=4000,
241                           action='store', nargs='?', type=int, metavar='INT',
242                           help="Length of buffers used for recv()")
243    dbg_group.add_argument('--process-unknown', action=argparse.BooleanOptionalAction)
244
245    args = parser.parse_args()
246
247    def output(msg):
248        if args.output_json:
249            print(json.dumps(msg, cls=YnlEncoder))
250        else:
251            pprint.PrettyPrinter().pprint(msg)
252
253    if args.list_families:
254        for filename in sorted(os.listdir(spec_dir())):
255            if filename.endswith('.yaml'):
256                print(filename.removesuffix('.yaml'))
257        return
258
259    if args.no_schema:
260        args.schema = ''
261
262    attrs = {}
263    if args.json_text:
264        attrs = json.loads(args.json_text)
265
266    if args.family:
267        spec = f"{spec_dir()}/{args.family}.yaml"
268    else:
269        spec = args.spec
270    if not os.path.isfile(spec):
271        raise YnlException(f"Spec file {spec} does not exist")
272
273    if args.validate:
274        try:
275            SpecFamily(spec, args.schema)
276        except SpecException as error:
277            print(error)
278            sys.exit(1)
279        return
280
281    if args.family: # set behaviour when using installed specs
282        if args.schema is None and spec.startswith(SYS_SCHEMA_DIR):
283            args.schema = '' # disable schema validation when installed
284        if args.process_unknown is None:
285            args.process_unknown = True
286
287    ynl = YnlFamily(spec, args.schema, args.process_unknown,
288                    recv_size=args.dbg_small_recv)
289    if args.dbg_small_recv:
290        ynl.set_recv_dbg(True)
291
292    if args.ntf:
293        ynl.ntf_subscribe(args.ntf)
294
295    if args.list_ops:
296        for op_name, op in ynl.ops.items():
297            print(op_name, " [", ", ".join(op.modes), "]")
298    if args.list_msgs:
299        for op_name, op in ynl.msgs.items():
300            print(op_name, " [", ", ".join(op.modes), "]")
301
302    if args.list_attrs:
303        op = ynl.msgs.get(args.list_attrs)
304        if not op:
305            print(f'Operation {args.list_attrs} not found')
306            sys.exit(1)
307
308        do_doc(ynl, op)
309
310    try:
311        if args.do:
312            reply = ynl.do(args.do, attrs, args.flags)
313            output(reply)
314        if args.dump:
315            reply = ynl.dump(args.dump, attrs)
316            output(reply)
317        if args.multi:
318            ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ]
319            reply = ynl.do_multi(ops)
320            output(reply)
321
322        if args.ntf:
323            for msg in ynl.poll_ntf(duration=args.duration):
324                output(msg)
325    except NlError as e:
326        print(e)
327        sys.exit(1)
328    except KeyboardInterrupt:
329        pass
330    except BrokenPipeError:
331        pass
332
333
334if __name__ == "__main__":
335    main()
336