xref: /freebsd/crypto/krb5/src/tests/jsonwalker.py (revision 7f2fe78b9dd5f51c821d771b63d2e096f6fd49e9)
1import sys
2import json
3from collections import defaultdict
4from optparse import OptionParser
5
6class Parser(object):
7    DEFAULTS = {int:0,
8                str:'',
9                list:[]}
10
11    def __init__(self, defconf=None):
12        self.defaults = None
13        if defconf is not None:
14            self.defaults = self.flatten(defconf)
15
16    def run(self, logs, verbose=None):
17        result = self.parse(logs)
18        if len(result) != len(self.defaults):
19            diff = set(self.defaults.keys()).difference(result.keys())
20            print('Test failed.')
21            print('The following attributes were not set:')
22            for it in diff:
23                print(it)
24            sys.exit(1)
25
26    def flatten(self, defaults):
27        """
28        Flattens paths to attributes.
29
30        Parameters
31        ----------
32        defaults : a dictionaries populated with default values
33
34        Returns :
35        dict : with flattened attributes
36        """
37        result = dict()
38        for path,value in self._walk(defaults):
39            if path in result:
40                print('Warning: attribute path %s already exists' % path)
41            result[path] = value
42
43        return result
44
45    def parse(self, logs):
46        result = defaultdict(list)
47        for msg in logs:
48            # each message is treated as a dictionary of dictionaries
49            for a,v in self._walk(msg):
50                # see if path is registered in defaults
51                if a in self.defaults:
52                    dv = self.defaults.get(a)
53                    if dv is None:
54                        # determine default value by type
55                        if v is not None:
56                            dv = self.DEFAULTS[type(v)]
57                        else:
58                            print('Warning: attribute %s is set to None' % a)
59                            continue
60                    # by now we have default value
61                    if v != dv:
62                        # test passed
63                        result[a].append(v)
64        return result
65
66    def _walk(self, adict):
67        """
68        Generator that works through dictionary.
69        """
70        for a,v in adict.items():
71            if isinstance(v,dict):
72                for (attrpath,u) in self._walk(v):
73                    yield (a+'.'+attrpath,u)
74            else:
75                yield (a,v)
76
77
78if __name__ == '__main__':
79
80    parser = OptionParser()
81    parser.add_option("-i", "--logfile", dest="filename",
82                  help="input log file in json fmt", metavar="FILE")
83    parser.add_option("-d", "--defaults", dest="defaults",
84                  help="dictionary with defaults", metavar="FILE")
85
86    (options, args) = parser.parse_args()
87    if options.filename is not None:
88        with open(options.filename, 'r') as f:
89            content = list()
90            for l in f:
91                content.append(json.loads(l.rstrip()))
92        f.close()
93    else:
94        print('Input file in JSON format is required')
95        exit()
96
97    defaults = None
98    if options.defaults is not None:
99        with open(options.defaults, 'r') as f:
100            defaults = json.load(f)
101
102    # run test
103    p = Parser(defaults)
104    p.run(content)
105    exit()
106