#!/usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0-only # # Copyright (C) 2024 Red Hat, Inc. Daniel Bristot de Oliveira # # This is a sample code about how to use timerlat's timer by any workload # so rtla can measure and provide auto-analysis for the overall latency (IOW # the response time) for a task. # # Before running it, you need to dispatch timerlat with -U option in a terminal. # Then # run this script pinned to a CPU on another terminal. For example: # # timerlat_load.py 1 -p 95 # # The "Timerlat IRQ" is the IRQ latency, The thread latency is the latency # for the python process to get the CPU. The Ret from user Timer Latency is # the overall latency. In other words, it is the response time for that # activation. # # This is just an example, the load is reading 20MB of data from /dev/full # It is in python because it is easy to read :-) import argparse import sys import os parser = argparse.ArgumentParser(description='user-space timerlat thread in Python') parser.add_argument("cpu", type=int, help='CPU to run timerlat thread') parser.add_argument("-p", "--prio", type=int, help='FIFO priority') args = parser.parse_args() try: affinity_mask = {args.cpu} os.sched_setaffinity(0, affinity_mask) except Exception as e: print(f"Error setting affinity: {e}") sys.exit(1) if args.prio: try: param = os.sched_param(args.prio) os.sched_setscheduler(0, os.SCHED_FIFO, param) except Exception as e: print(f"Error setting priority: {e}") sys.exit(1) try: timerlat_path = f"/sys/kernel/tracing/osnoise/per_cpu/cpu{args.cpu}/timerlat_fd" timerlat_fd = open(timerlat_path, 'r') except PermissionError: print("Permission denied. Please check your access rights.") sys.exit(1) except OSError: print("Error opening timerlat fd, did you run timerlat -U?") sys.exit(1) try: data_fd = open("/dev/full", 'r') except Exception as e: print(f"Error opening data fd: {e}") sys.exit(1) while True: try: timerlat_fd.read(1) data_fd.read(20 * 1024 * 1024) except KeyboardInterrupt: print("Leaving") break except IOError as e: print(f"I/O error occurred: {e}") break except Exception as e: print(f"Unexpected error: {e}") break timerlat_fd.close() data_fd.close()