xref: /linux/tools/net/ynl/lib/nlspec.py (revision 3f3a1675b731e532d479e65570f2904878fbd9f0)
1# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2
3import collections
4import importlib
5import os
6import yaml
7
8
9# To be loaded dynamically as needed
10jsonschema = None
11
12
13class SpecElement:
14    """Netlink spec element.
15
16    Abstract element of the Netlink spec. Implements the dictionary interface
17    for access to the raw spec. Supports iterative resolution of dependencies
18    across elements and class inheritance levels. The elements of the spec
19    may refer to each other, and although loops should be very rare, having
20    to maintain correct ordering of instantiation is painful, so the resolve()
21    method should be used to perform parts of init which require access to
22    other parts of the spec.
23
24    Attributes:
25        yaml        raw spec as loaded from the spec file
26        family      back reference to the full family
27
28        name        name of the entity as listed in the spec (optional)
29        ident_name  name which can be safely used as identifier in code (optional)
30    """
31    def __init__(self, family, yaml):
32        self.yaml = yaml
33        self.family = family
34
35        if 'name' in self.yaml:
36            self.name = self.yaml['name']
37            self.ident_name = self.name.replace('-', '_')
38
39        self._super_resolved = False
40        family.add_unresolved(self)
41
42    def __getitem__(self, key):
43        return self.yaml[key]
44
45    def __contains__(self, key):
46        return key in self.yaml
47
48    def get(self, key, default=None):
49        return self.yaml.get(key, default)
50
51    def resolve_up(self, up):
52        if not self._super_resolved:
53            up.resolve()
54            self._super_resolved = True
55
56    def resolve(self):
57        pass
58
59
60class SpecEnumEntry(SpecElement):
61    """ Entry within an enum declared in the Netlink spec.
62
63    Attributes:
64        doc         documentation string
65        enum_set    back reference to the enum
66        value       numerical value of this enum (use accessors in most situations!)
67
68    Methods:
69        raw_value   raw value, i.e. the id in the enum, unlike user value which is a mask for flags
70        user_value   user value, same as raw value for enums, for flags it's the mask
71    """
72    def __init__(self, enum_set, yaml, prev, value_start):
73        if isinstance(yaml, str):
74            yaml = {'name': yaml}
75        super().__init__(enum_set.family, yaml)
76
77        self.doc = yaml.get('doc', '')
78        self.enum_set = enum_set
79
80        if 'value' in yaml:
81            self.value = yaml['value']
82        elif prev:
83            self.value = prev.value + 1
84        else:
85            self.value = value_start
86
87    def has_doc(self):
88        return bool(self.doc)
89
90    def raw_value(self):
91        return self.value
92
93    def user_value(self):
94        if self.enum_set['type'] == 'flags':
95            return 1 << self.value
96        else:
97            return self.value
98
99
100class SpecEnumSet(SpecElement):
101    """ Enum type
102
103    Represents an enumeration (list of numerical constants)
104    as declared in the "definitions" section of the spec.
105
106    Attributes:
107        type            enum or flags
108        entries         entries by name
109        entries_by_val  entries by value
110    Methods:
111        get_mask      for flags compute the mask of all defined values
112    """
113    def __init__(self, family, yaml):
114        super().__init__(family, yaml)
115
116        self.type = yaml['type']
117
118        prev_entry = None
119        value_start = self.yaml.get('value-start', 0)
120        self.entries = dict()
121        self.entries_by_val = dict()
122        for entry in self.yaml['entries']:
123            e = self.new_entry(entry, prev_entry, value_start)
124            self.entries[e.name] = e
125            self.entries_by_val[e.raw_value()] = e
126            prev_entry = e
127
128    def new_entry(self, entry, prev_entry, value_start):
129        return SpecEnumEntry(self, entry, prev_entry, value_start)
130
131    def has_doc(self):
132        if 'doc' in self.yaml:
133            return True
134        for entry in self.entries.values():
135            if entry.has_doc():
136                return True
137        return False
138
139    def get_mask(self):
140        mask = 0
141        for e in self.entries.values():
142            mask += e.user_value()
143        return mask
144
145
146class SpecAttr(SpecElement):
147    """ Single Netlink atttribute type
148
149    Represents a single attribute type within an attr space.
150
151    Attributes:
152        value      numerical ID when serialized
153        attr_set   Attribute Set containing this attr
154    """
155    def __init__(self, family, attr_set, yaml, value):
156        super().__init__(family, yaml)
157
158        self.value = value
159        self.attr_set = attr_set
160        self.is_multi = yaml.get('multi-attr', False)
161
162
163class SpecAttrSet(SpecElement):
164    """ Netlink Attribute Set class.
165
166    Represents a ID space of attributes within Netlink.
167
168    Note that unlike other elements, which expose contents of the raw spec
169    via the dictionary interface Attribute Set exposes attributes by name.
170
171    Attributes:
172        attrs      ordered dict of all attributes (indexed by name)
173        attrs_by_val  ordered dict of all attributes (indexed by value)
174        subset_of  parent set if this is a subset, otherwise None
175    """
176    def __init__(self, family, yaml):
177        super().__init__(family, yaml)
178
179        self.subset_of = self.yaml.get('subset-of', None)
180
181        self.attrs = collections.OrderedDict()
182        self.attrs_by_val = collections.OrderedDict()
183
184        if self.subset_of is None:
185            val = 1
186            for elem in self.yaml['attributes']:
187                if 'value' in elem:
188                    val = elem['value']
189
190                attr = self.new_attr(elem, val)
191                self.attrs[attr.name] = attr
192                self.attrs_by_val[attr.value] = attr
193                val += 1
194        else:
195            real_set = family.attr_sets[self.subset_of]
196            for elem in self.yaml['attributes']:
197                attr = real_set[elem['name']]
198                self.attrs[attr.name] = attr
199                self.attrs_by_val[attr.value] = attr
200
201    def new_attr(self, elem, value):
202        return SpecAttr(self.family, self, elem, value)
203
204    def __getitem__(self, key):
205        return self.attrs[key]
206
207    def __contains__(self, key):
208        return key in self.attrs
209
210    def __iter__(self):
211        yield from self.attrs
212
213    def items(self):
214        return self.attrs.items()
215
216
217class SpecOperation(SpecElement):
218    """Netlink Operation
219
220    Information about a single Netlink operation.
221
222    Attributes:
223        value       numerical ID when serialized, None if req/rsp values differ
224
225        req_value   numerical ID when serialized, user -> kernel
226        rsp_value   numerical ID when serialized, user <- kernel
227        is_call     bool, whether the operation is a call
228        is_async    bool, whether the operation is a notification
229        is_resv     bool, whether the operation does not exist (it's just a reserved ID)
230        attr_set    attribute set name
231
232        yaml        raw spec as loaded from the spec file
233    """
234    def __init__(self, family, yaml, req_value, rsp_value):
235        super().__init__(family, yaml)
236
237        self.value = req_value if req_value == rsp_value else None
238        self.req_value = req_value
239        self.rsp_value = rsp_value
240
241        self.is_call = 'do' in yaml or 'dump' in yaml
242        self.is_async = 'notify' in yaml or 'event' in yaml
243        self.is_resv = not self.is_async and not self.is_call
244
245        # Added by resolve:
246        self.attr_set = None
247        delattr(self, "attr_set")
248
249    def resolve(self):
250        self.resolve_up(super())
251
252        if 'attribute-set' in self.yaml:
253            attr_set_name = self.yaml['attribute-set']
254        elif 'notify' in self.yaml:
255            msg = self.family.msgs[self.yaml['notify']]
256            attr_set_name = msg['attribute-set']
257        elif self.is_resv:
258            attr_set_name = ''
259        else:
260            raise Exception(f"Can't resolve attribute set for op '{self.name}'")
261        if attr_set_name:
262            self.attr_set = self.family.attr_sets[attr_set_name]
263
264
265class SpecFamily(SpecElement):
266    """ Netlink Family Spec class.
267
268    Netlink family information loaded from a spec (e.g. in YAML).
269    Takes care of unfolding implicit information which can be skipped
270    in the spec itself for brevity.
271
272    The class can be used like a dictionary to access the raw spec
273    elements but that's usually a bad idea.
274
275    Attributes:
276        proto     protocol type (e.g. genetlink)
277        license   spec license (loaded from an SPDX tag on the spec)
278
279        attr_sets  dict of attribute sets
280        msgs       dict of all messages (index by name)
281        msgs_by_value  dict of all messages (indexed by name)
282        ops        dict of all valid requests / responses
283        consts     dict of all constants/enums
284    """
285    def __init__(self, spec_path, schema_path=None):
286        with open(spec_path, "r") as stream:
287            prefix = '# SPDX-License-Identifier: '
288            first = stream.readline().strip()
289            if not first.startswith(prefix):
290                raise Exception('SPDX license tag required in the spec')
291            self.license = first[len(prefix):]
292
293            stream.seek(0)
294            spec = yaml.safe_load(stream)
295
296        self._resolution_list = []
297
298        super().__init__(self, spec)
299
300        self.proto = self.yaml.get('protocol', 'genetlink')
301
302        if schema_path is None:
303            schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml'
304        if schema_path:
305            global jsonschema
306
307            with open(schema_path, "r") as stream:
308                schema = yaml.safe_load(stream)
309
310            if jsonschema is None:
311                jsonschema = importlib.import_module("jsonschema")
312
313            jsonschema.validate(self.yaml, schema)
314
315        self.attr_sets = collections.OrderedDict()
316        self.msgs = collections.OrderedDict()
317        self.req_by_value = collections.OrderedDict()
318        self.rsp_by_value = collections.OrderedDict()
319        self.ops = collections.OrderedDict()
320        self.consts = collections.OrderedDict()
321
322        last_exception = None
323        while len(self._resolution_list) > 0:
324            resolved = []
325            unresolved = self._resolution_list
326            self._resolution_list = []
327
328            for elem in unresolved:
329                try:
330                    elem.resolve()
331                except (KeyError, AttributeError) as e:
332                    self._resolution_list.append(elem)
333                    last_exception = e
334                    continue
335
336                resolved.append(elem)
337
338            if len(resolved) == 0:
339                raise last_exception
340
341    def new_enum(self, elem):
342        return SpecEnumSet(self, elem)
343
344    def new_attr_set(self, elem):
345        return SpecAttrSet(self, elem)
346
347    def new_operation(self, elem, req_val, rsp_val):
348        return SpecOperation(self, elem, req_val, rsp_val)
349
350    def add_unresolved(self, elem):
351        self._resolution_list.append(elem)
352
353    def _dictify_ops_unified(self):
354        val = 1
355        for elem in self.yaml['operations']['list']:
356            if 'value' in elem:
357                val = elem['value']
358
359            op = self.new_operation(elem, val, val)
360            val += 1
361
362            self.msgs[op.name] = op
363
364    def _dictify_ops_directional(self):
365        req_val = rsp_val = 1
366        for elem in self.yaml['operations']['list']:
367            if 'notify' in elem:
368                if 'value' in elem:
369                    rsp_val = elem['value']
370                req_val_next = req_val
371                rsp_val_next = rsp_val + 1
372                req_val = None
373            elif 'do' in elem or 'dump' in elem:
374                mode = elem['do'] if 'do' in elem else elem['dump']
375
376                v = mode.get('request', {}).get('value', None)
377                if v:
378                    req_val = v
379                v = mode.get('reply', {}).get('value', None)
380                if v:
381                    rsp_val = v
382
383                rsp_inc = 1 if 'reply' in mode else 0
384                req_val_next = req_val + 1
385                rsp_val_next = rsp_val + rsp_inc
386            else:
387                raise Exception("Can't parse directional ops")
388
389            op = self.new_operation(elem, req_val, rsp_val)
390            req_val = req_val_next
391            rsp_val = rsp_val_next
392
393            self.msgs[op.name] = op
394
395    def resolve(self):
396        self.resolve_up(super())
397
398        definitions = self.yaml.get('definitions', [])
399        for elem in definitions:
400            if elem['type'] == 'enum' or elem['type'] == 'flags':
401                self.consts[elem['name']] = self.new_enum(elem)
402            else:
403                self.consts[elem['name']] = elem
404
405        for elem in self.yaml['attribute-sets']:
406            attr_set = self.new_attr_set(elem)
407            self.attr_sets[elem['name']] = attr_set
408
409        msg_id_model = self.yaml['operations'].get('enum-model', 'unified')
410        if msg_id_model == 'unified':
411            self._dictify_ops_unified()
412        elif msg_id_model == 'directional':
413            self._dictify_ops_directional()
414
415        for op in self.msgs.values():
416            if op.req_value is not None:
417                self.req_by_value[op.req_value] = op
418            if op.rsp_value is not None:
419                self.rsp_by_value[op.rsp_value] = op
420            if not op.is_async and 'attribute-set' in op:
421                self.ops[op.name] = op
422