1# SPDX-License-Identifier: GPL-2.0 2import re 3import csv 4import json 5import argparse 6from pathlib import Path 7import subprocess 8 9 10class TestError: 11 def __init__(self, metric: list[str], wl: str, value: list[float], low: float, up=float('nan'), description=str()): 12 self.metric: list = metric # multiple metrics in relationship type tests 13 self.workloads = [wl] # multiple workloads possible 14 self.collectedValue: list = value 15 self.valueLowBound = low 16 self.valueUpBound = up 17 self.description = description 18 19 def __repr__(self) -> str: 20 if len(self.metric) > 1: 21 return "\nMetric Relationship Error: \tThe collected value of metric {0}\n\ 22 \tis {1} in workload(s): {2} \n\ 23 \tbut expected value range is [{3}, {4}]\n\ 24 \tRelationship rule description: \'{5}\'".format(self.metric, self.collectedValue, self.workloads, 25 self.valueLowBound, self.valueUpBound, self.description) 26 elif len(self.collectedValue) == 0: 27 return "\nNo Metric Value Error: \tMetric {0} returns with no value \n\ 28 \tworkload(s): {1}".format(self.metric, self.workloads) 29 else: 30 return "\nWrong Metric Value Error: \tThe collected value of metric {0}\n\ 31 \tis {1} in workload(s): {2}\n\ 32 \tbut expected value range is [{3}, {4}]"\ 33 .format(self.metric, self.collectedValue, self.workloads, 34 self.valueLowBound, self.valueUpBound) 35 36 37class Validator: 38 def __init__(self, rulefname, reportfname='', t=5, debug=False, datafname='', fullrulefname='', 39 workload='true', metrics='', cputype='cpu'): 40 self.rulefname = rulefname 41 self.reportfname = reportfname 42 self.rules = None 43 self.collectlist: str = metrics 44 self.metrics = self.__set_metrics(metrics) 45 self.skiplist = set() 46 self.tolerance = t 47 self.cputype = cputype 48 49 self.workloads = [x for x in workload.split(",") if x] 50 self.wlidx = 0 # idx of current workloads 51 self.allresults = dict() # metric results of all workload 52 self.alltotalcnt = dict() 53 self.allpassedcnt = dict() 54 55 self.results = dict() # metric results of current workload 56 # vars for test pass/failure statistics 57 # metrics with no results or negative results, neg result counts failed tests 58 self.ignoremetrics = set() 59 self.totalcnt = 0 60 self.passedcnt = 0 61 # vars for errors 62 self.errlist = list() 63 64 # vars for Rule Generator 65 self.pctgmetrics = set() # Percentage rule 66 67 # vars for debug 68 self.datafname = datafname 69 self.debug = debug 70 self.fullrulefname = fullrulefname 71 72 def __set_metrics(self, metrics=''): 73 if metrics != '': 74 return set(metrics.split(",")) 75 else: 76 return set() 77 78 def read_json(self, filename: str) -> dict: 79 try: 80 with open(Path(filename).resolve(), "r") as f: 81 data = json.loads(f.read()) 82 except OSError as e: 83 print(f"Error when reading file {e}") 84 sys.exit() 85 86 return data 87 88 def json_dump(self, data, output_file): 89 parent = Path(output_file).parent 90 if not parent.exists(): 91 parent.mkdir(parents=True) 92 93 with open(output_file, "w+") as output_file: 94 json.dump(data, 95 output_file, 96 ensure_ascii=True, 97 indent=4) 98 99 def get_results(self, idx: int = 0): 100 return self.results.get(idx) 101 102 def get_bounds(self, lb, ub, error, alias={}, ridx: int = 0) -> list: 103 """ 104 Get bounds and tolerance from lb, ub, and error. 105 If missing lb, use 0.0; missing ub, use float('inf); missing error, use self.tolerance. 106 107 @param lb: str/float, lower bound 108 @param ub: str/float, upper bound 109 @param error: float/str, error tolerance 110 @returns: lower bound, return inf if the lower bound is a metric value and is not collected 111 upper bound, return -1 if the upper bound is a metric value and is not collected 112 tolerance, denormalized base on upper bound value 113 """ 114 # init ubv and lbv to invalid values 115 def get_bound_value(bound, initval, ridx): 116 val = initval 117 if isinstance(bound, int) or isinstance(bound, float): 118 val = bound 119 elif isinstance(bound, str): 120 if bound == '': 121 val = float("inf") 122 elif bound in alias: 123 vall = self.get_value(alias[ub], ridx) 124 if vall: 125 val = vall[0] 126 elif bound.replace('.', '1').isdigit(): 127 val = float(bound) 128 else: 129 print("Wrong bound: {0}".format(bound)) 130 else: 131 print("Wrong bound: {0}".format(bound)) 132 return val 133 134 ubv = get_bound_value(ub, -1, ridx) 135 lbv = get_bound_value(lb, float('inf'), ridx) 136 t = get_bound_value(error, self.tolerance, ridx) 137 138 # denormalize error threshold 139 denormerr = t * ubv / 100 if ubv != 100 and ubv > 0 else t 140 141 return lbv, ubv, denormerr 142 143 def get_value(self, name: str, ridx: int = 0) -> list: 144 """ 145 Get value of the metric from self.results. 146 If result of this metric is not provided, the metric name will be added into self.ignoremetics. 147 All future test(s) on this metric will fail. 148 149 @param name: name of the metric 150 @returns: list with value found in self.results; list is empty when value is not found. 151 """ 152 results = [] 153 data = self.results[ridx] if ridx in self.results else self.results[0] 154 if name not in self.ignoremetrics: 155 if name in data: 156 results.append(data[name]) 157 elif name.replace('.', '1').isdigit(): 158 results.append(float(name)) 159 else: 160 self.ignoremetrics.add(name) 161 return results 162 163 def check_bound(self, val, lb, ub, err): 164 return True if val <= ub + err and val >= lb - err else False 165 166 # Positive Value Sanity check 167 def pos_val_test(self): 168 """ 169 Check if metrics value are non-negative. 170 One metric is counted as one test. 171 Failure: when metric value is negative or not provided. 172 Metrics with negative value will be added into self.ignoremetrics. 173 """ 174 negmetric = dict() 175 pcnt = 0 176 tcnt = 0 177 rerun = list() 178 results = self.get_results() 179 if not results: 180 return 181 for name, val in results.items(): 182 if val < 0: 183 negmetric[name] = val 184 rerun.append(name) 185 else: 186 pcnt += 1 187 tcnt += 1 188 # The first round collect_perf() run these metrics with simple workload 189 # "true". We give metrics a second chance with a longer workload if less 190 # than 20 metrics failed positive test. 191 if len(rerun) > 0 and len(rerun) < 20: 192 second_results = dict() 193 self.second_test(rerun, second_results) 194 for name, val in second_results.items(): 195 if name not in negmetric: 196 continue 197 if val >= 0: 198 del negmetric[name] 199 pcnt += 1 200 201 if len(negmetric.keys()): 202 self.ignoremetrics.update(negmetric.keys()) 203 self.errlist.extend( 204 [TestError([m], self.workloads[self.wlidx], negmetric[m], 0) for m in negmetric.keys()]) 205 206 return 207 208 def evaluate_formula(self, formula: str, alias: dict, ridx: int = 0): 209 """ 210 Evaluate the value of formula. 211 212 @param formula: the formula to be evaluated 213 @param alias: the dict has alias to metric name mapping 214 @returns: value of the formula is success; -1 if the one or more metric value not provided 215 """ 216 stack = [] 217 b = 0 218 errs = [] 219 sign = "+" 220 f = str() 221 222 # TODO: support parenthesis? 223 for i in range(len(formula)): 224 if i+1 == len(formula) or formula[i] in ('+', '-', '*', '/'): 225 s = alias[formula[b:i]] if i + \ 226 1 < len(formula) else alias[formula[b:]] 227 v = self.get_value(s, ridx) 228 if not v: 229 errs.append(s) 230 else: 231 f = f + "{0}(={1:.4f})".format(s, v[0]) 232 if sign == "*": 233 stack[-1] = stack[-1] * v 234 elif sign == "/": 235 stack[-1] = stack[-1] / v 236 elif sign == '-': 237 stack.append(-v[0]) 238 else: 239 stack.append(v[0]) 240 if i + 1 < len(formula): 241 sign = formula[i] 242 f += sign 243 b = i + 1 244 245 if len(errs) > 0: 246 return -1, "Metric value missing: "+','.join(errs) 247 248 val = sum(stack) 249 return val, f 250 251 # Relationships Tests 252 def relationship_test(self, rule: dict): 253 """ 254 Validate if the metrics follow the required relationship in the rule. 255 eg. lower_bound <= eval(formula)<= upper_bound 256 One rule is counted as ont test. 257 Failure: when one or more metric result(s) not provided, or when formula evaluated outside of upper/lower bounds. 258 259 @param rule: dict with metric name(+alias), formula, and required upper and lower bounds. 260 """ 261 alias = dict() 262 for m in rule['Metrics']: 263 alias[m['Alias']] = m['Name'] 264 lbv, ubv, t = self.get_bounds( 265 rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold'], alias, ridx=rule['RuleIndex']) 266 val, f = self.evaluate_formula( 267 rule['Formula'], alias, ridx=rule['RuleIndex']) 268 269 lb = rule['RangeLower'] 270 ub = rule['RangeUpper'] 271 if isinstance(lb, str): 272 if lb in alias: 273 lb = alias[lb] 274 if isinstance(ub, str): 275 if ub in alias: 276 ub = alias[ub] 277 278 if val == -1: 279 self.errlist.append(TestError([m['Name'] for m in rule['Metrics']], self.workloads[self.wlidx], [], 280 lb, ub, rule['Description'])) 281 elif not self.check_bound(val, lbv, ubv, t): 282 self.errlist.append(TestError([m['Name'] for m in rule['Metrics']], self.workloads[self.wlidx], [val], 283 lb, ub, rule['Description'])) 284 else: 285 self.passedcnt += 1 286 self.totalcnt += 1 287 288 return 289 290 # Single Metric Test 291 def single_test(self, rule: dict): 292 """ 293 Validate if the metrics are in the required value range. 294 eg. lower_bound <= metrics_value <= upper_bound 295 One metric is counted as one test in this type of test. 296 One rule may include one or more metrics. 297 Failure: when the metric value not provided or the value is outside the bounds. 298 This test updates self.total_cnt. 299 300 @param rule: dict with metrics to validate and the value range requirement 301 """ 302 lbv, ubv, t = self.get_bounds( 303 rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold']) 304 metrics = rule['Metrics'] 305 passcnt = 0 306 totalcnt = 0 307 failures = dict() 308 rerun = list() 309 for m in metrics: 310 totalcnt += 1 311 result = self.get_value(m['Name']) 312 if len(result) > 0 and self.check_bound(result[0], lbv, ubv, t) or m['Name'] in self.skiplist: 313 passcnt += 1 314 else: 315 failures[m['Name']] = result 316 rerun.append(m['Name']) 317 318 if len(rerun) > 0 and len(rerun) < 20: 319 second_results = dict() 320 self.second_test(rerun, second_results) 321 for name, val in second_results.items(): 322 if name not in failures: 323 continue 324 if self.check_bound(val, lbv, ubv, t): 325 passcnt += 1 326 del failures[name] 327 else: 328 failures[name] = [val] 329 self.results[0][name] = val 330 331 self.totalcnt += totalcnt 332 self.passedcnt += passcnt 333 if len(failures.keys()) != 0: 334 self.errlist.extend([TestError([name], self.workloads[self.wlidx], val, 335 rule['RangeLower'], rule['RangeUpper']) for name, val in failures.items()]) 336 337 return 338 339 def create_report(self): 340 """ 341 Create final report and write into a JSON file. 342 """ 343 print(self.errlist) 344 345 if self.debug: 346 allres = [{"Workload": self.workloads[i], "Results": self.allresults[i]} 347 for i in range(0, len(self.workloads))] 348 self.json_dump(allres, self.datafname) 349 350 def check_rule(self, testtype, metric_list): 351 """ 352 Check if the rule uses metric(s) that not exist in current platform. 353 354 @param metric_list: list of metrics from the rule. 355 @return: False when find one metric out in Metric file. (This rule should not skipped.) 356 True when all metrics used in the rule are found in Metric file. 357 """ 358 if testtype == "RelationshipTest": 359 for m in metric_list: 360 if m['Name'] not in self.metrics: 361 return False 362 return True 363 364 # Start of Collector and Converter 365 def convert(self, data: list, metricvalues: dict): 366 """ 367 Convert collected metric data from the -j output to dict of {metric_name:value}. 368 """ 369 for json_string in data: 370 try: 371 result = json.loads(json_string) 372 if "metric-unit" in result and result["metric-unit"] != "(null)" and result["metric-unit"] != "": 373 name = result["metric-unit"].split(" ")[1] if len(result["metric-unit"].split(" ")) > 1 \ 374 else result["metric-unit"] 375 metricvalues[name.lower()] = float(result["metric-value"]) 376 except ValueError as error: 377 continue 378 return 379 380 def _run_perf(self, metric, workload: str): 381 tool = 'perf' 382 command = [tool, 'stat', '--cputype', self.cputype, '-j', '-M', f"{metric}", "-a"] 383 wl = workload.split() 384 command.extend(wl) 385 print(" ".join(command)) 386 cmd = subprocess.run(command, stderr=subprocess.PIPE, encoding='utf-8') 387 data = [x+'}' for x in cmd.stderr.split('}\n') if x] 388 if data[0][0] != '{': 389 data[0] = data[0][data[0].find('{'):] 390 return data 391 392 def collect_perf(self, workload: str): 393 """ 394 Collect metric data with "perf stat -M" on given workload with -a and -j. 395 """ 396 self.results = dict() 397 print(f"Starting perf collection") 398 print(f"Long workload: {workload}") 399 collectlist = dict() 400 if self.collectlist != "": 401 collectlist[0] = {x for x in self.collectlist.split(",")} 402 else: 403 collectlist[0] = set(list(self.metrics)) 404 # Create metric set for relationship rules 405 for rule in self.rules: 406 if rule["TestType"] == "RelationshipTest": 407 metrics = [m["Name"] for m in rule["Metrics"]] 408 if not any(m not in collectlist[0] for m in metrics): 409 collectlist[rule["RuleIndex"]] = [ 410 ",".join(list(set(metrics)))] 411 412 for idx, metrics in collectlist.items(): 413 if idx == 0: 414 wl = "true" 415 else: 416 wl = workload 417 for metric in metrics: 418 data = self._run_perf(metric, wl) 419 if idx not in self.results: 420 self.results[idx] = dict() 421 self.convert(data, self.results[idx]) 422 return 423 424 def second_test(self, collectlist, second_results): 425 workload = self.workloads[self.wlidx] 426 for metric in collectlist: 427 data = self._run_perf(metric, workload) 428 self.convert(data, second_results) 429 430 # End of Collector and Converter 431 432 # Start of Rule Generator 433 def parse_perf_metrics(self): 434 """ 435 Read and parse perf metric file: 436 1) find metrics with '1%' or '100%' as ScaleUnit for Percent check 437 2) create metric name list 438 """ 439 command = ['perf', 'list', '-j', '--details', 'metrics'] 440 cmd = subprocess.run(command, stdout=subprocess.PIPE, 441 stderr=subprocess.PIPE, encoding='utf-8') 442 try: 443 data = json.loads(cmd.stdout) 444 for m in data: 445 if 'MetricName' not in m: 446 print("Warning: no metric name") 447 continue 448 if 'Unit' in m and m['Unit'] != self.cputype: 449 continue 450 name = m['MetricName'].lower() 451 self.metrics.add(name) 452 if 'ScaleUnit' in m and (m['ScaleUnit'] == '1%' or m['ScaleUnit'] == '100%'): 453 self.pctgmetrics.add(name.lower()) 454 except ValueError as error: 455 print(f"Error when parsing metric data") 456 sys.exit() 457 458 return 459 460 def remove_unsupported_rules(self, rules): 461 new_rules = [] 462 for rule in rules: 463 add_rule = True 464 for m in rule["Metrics"]: 465 if m["Name"] in self.skiplist or m["Name"] not in self.metrics: 466 add_rule = False 467 break 468 if add_rule: 469 new_rules.append(rule) 470 return new_rules 471 472 def create_rules(self): 473 """ 474 Create full rules which includes: 475 1) All the rules from the "relationshi_rules" file 476 2) SingleMetric rule for all the 'percent' metrics 477 478 Reindex all the rules to avoid repeated RuleIndex 479 """ 480 data = self.read_json(self.rulefname) 481 rules = data['RelationshipRules'] 482 self.skiplist = set([name.lower() for name in data['SkipList']]) 483 self.rules = self.remove_unsupported_rules(rules) 484 pctgrule = {'RuleIndex': 0, 485 'TestType': 'SingleMetricTest', 486 'RangeLower': '0', 487 'RangeUpper': '100', 488 'ErrorThreshold': self.tolerance, 489 'Description': 'Metrics in percent unit have value with in [0, 100]', 490 'Metrics': [{'Name': m.lower()} for m in self.pctgmetrics]} 491 self.rules.append(pctgrule) 492 493 # Re-index all rules to avoid repeated RuleIndex 494 idx = 1 495 for r in self.rules: 496 r['RuleIndex'] = idx 497 idx += 1 498 499 if self.debug: 500 # TODO: need to test and generate file name correctly 501 data = {'RelationshipRules': self.rules, 'SupportedMetrics': [ 502 {"MetricName": name} for name in self.metrics]} 503 self.json_dump(data, self.fullrulefname) 504 505 return 506 # End of Rule Generator 507 508 def _storewldata(self, key): 509 ''' 510 Store all the data of one workload into the corresponding data structure for all workloads. 511 @param key: key to the dictionaries (index of self.workloads). 512 ''' 513 self.allresults[key] = self.results 514 self.alltotalcnt[key] = self.totalcnt 515 self.allpassedcnt[key] = self.passedcnt 516 517 # Initialize data structures before data validation of each workload 518 def _init_data(self): 519 520 testtypes = ['PositiveValueTest', 521 'RelationshipTest', 'SingleMetricTest'] 522 self.results = dict() 523 self.ignoremetrics = set() 524 self.errlist = list() 525 self.totalcnt = 0 526 self.passedcnt = 0 527 528 def test(self): 529 ''' 530 The real entry point of the test framework. 531 This function loads the validation rule JSON file and Standard Metric file to create rules for 532 testing and namemap dictionaries. 533 It also reads in result JSON file for testing. 534 535 In the test process, it passes through each rule and launch correct test function bases on the 536 'TestType' field of the rule. 537 538 The final report is written into a JSON file. 539 ''' 540 if not self.collectlist: 541 self.parse_perf_metrics() 542 if not self.metrics: 543 print("No metric found for testing") 544 return 0 545 self.create_rules() 546 for i in range(0, len(self.workloads)): 547 self.wlidx = i 548 self._init_data() 549 self.collect_perf(self.workloads[i]) 550 # Run positive value test 551 self.pos_val_test() 552 for r in self.rules: 553 # skip rules that uses metrics not exist in this platform 554 testtype = r['TestType'] 555 if not self.check_rule(testtype, r['Metrics']): 556 continue 557 if testtype == 'RelationshipTest': 558 self.relationship_test(r) 559 elif testtype == 'SingleMetricTest': 560 self.single_test(r) 561 else: 562 print("Unsupported Test Type: ", testtype) 563 print("Workload: ", self.workloads[i]) 564 print("Total Test Count: ", self.totalcnt) 565 print("Passed Test Count: ", self.passedcnt) 566 self._storewldata(i) 567 self.create_report() 568 return len(self.errlist) > 0 569# End of Class Validator 570 571 572def main() -> None: 573 parser = argparse.ArgumentParser( 574 description="Launch metric value validation") 575 576 parser.add_argument( 577 "-rule", help="Base validation rule file", required=True) 578 parser.add_argument( 579 "-output_dir", help="Path for validator output file, report file", required=True) 580 parser.add_argument("-debug", help="Debug run, save intermediate data to files", 581 action="store_true", default=False) 582 parser.add_argument( 583 "-wl", help="Workload to run while data collection", default="true") 584 parser.add_argument("-m", help="Metric list to validate", default="") 585 parser.add_argument("-cputype", help="Only test metrics for the given CPU/PMU type", 586 default="cpu") 587 args = parser.parse_args() 588 outpath = Path(args.output_dir) 589 reportf = Path.joinpath(outpath, 'perf_report.json') 590 fullrule = Path.joinpath(outpath, 'full_rule.json') 591 datafile = Path.joinpath(outpath, 'perf_data.json') 592 593 validator = Validator(args.rule, reportf, debug=args.debug, 594 datafname=datafile, fullrulefname=fullrule, workload=args.wl, 595 metrics=args.m, cputype=args.cputype) 596 ret = validator.test() 597 598 return ret 599 600 601if __name__ == "__main__": 602 import sys 603 sys.exit(main()) 604