1#!/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# -*- coding: utf-8 -*- 4# 5# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com> 6# Copyright (c) 2017 Red Hat, Inc. 7# 8# This program is free software: you can redistribute it and/or modify 9# it under the terms of the GNU General Public License as published by 10# the Free Software Foundation; either version 2 of the License, or 11# (at your option) any later version. 12# 13# This program is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# GNU General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with this program. If not, see <http://www.gnu.org/licenses/>. 20 21import fcntl 22import functools 23import libevdev 24import os 25 26try: 27 import pyudev 28except ImportError: 29 raise ImportError("UHID is not supported due to missing pyudev dependency") 30 31import logging 32 33import hidtools.hid as hid 34from hidtools.uhid import UHIDDevice 35from hidtools.util import BusType 36 37from pathlib import Path 38from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union 39 40logger = logging.getLogger("hidtools.device.base_device") 41 42 43class SysfsFile(object): 44 def __init__(self, path): 45 self.path = path 46 47 def __set_value(self, value): 48 with open(self.path, "w") as f: 49 return f.write(f"{value}\n") 50 51 def __get_value(self): 52 with open(self.path) as f: 53 return f.read().strip() 54 55 @property 56 def int_value(self) -> int: 57 return int(self.__get_value()) 58 59 @int_value.setter 60 def int_value(self, v: int) -> None: 61 self.__set_value(v) 62 63 @property 64 def str_value(self) -> str: 65 return self.__get_value() 66 67 @str_value.setter 68 def str_value(self, v: str) -> None: 69 self.__set_value(v) 70 71 72class LED(object): 73 def __init__(self, sys_path): 74 self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value 75 self.__brightness = SysfsFile(sys_path / "brightness") 76 77 @property 78 def brightness(self) -> int: 79 return self.__brightness.int_value 80 81 @brightness.setter 82 def brightness(self, value: int) -> None: 83 self.__brightness.int_value = value 84 85 86class PowerSupply(object): 87 """Represents Linux power_supply_class sysfs nodes.""" 88 89 def __init__(self, sys_path): 90 self._capacity = SysfsFile(sys_path / "capacity") 91 self._status = SysfsFile(sys_path / "status") 92 self._type = SysfsFile(sys_path / "type") 93 94 @property 95 def capacity(self) -> int: 96 return self._capacity.int_value 97 98 @property 99 def status(self) -> str: 100 return self._status.str_value 101 102 @property 103 def type(self) -> str: 104 return self._type.str_value 105 106 107class HIDIsReady(object): 108 """ 109 Companion class that binds to a kernel mechanism 110 and that allows to know when a uhid device is ready or not. 111 112 See :meth:`is_ready` for details. 113 """ 114 115 def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None: 116 self.uhid = uhid 117 118 def is_ready(self: "HIDIsReady") -> bool: 119 """ 120 Overwrite in subclasses: should return True or False whether 121 the attached uhid device is ready or not. 122 """ 123 return False 124 125 126class UdevHIDIsReady(HIDIsReady): 127 _pyudev_context: ClassVar[Optional[pyudev.Context]] = None 128 _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None 129 _uhid_devices: ClassVar[Dict[int, Tuple[bool, int]]] = {} 130 131 def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None: 132 super().__init__(uhid) 133 self._init_pyudev() 134 135 @classmethod 136 def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None: 137 if cls._pyudev_context is None: 138 cls._pyudev_context = pyudev.Context() 139 cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context) 140 cls._pyudev_monitor.filter_by("hid") 141 cls._pyudev_monitor.start() 142 143 UHIDDevice._append_fd_to_poll( 144 cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback 145 ) 146 147 @classmethod 148 def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None: 149 if cls._pyudev_monitor is None: 150 return 151 event: pyudev.Device 152 for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None): 153 if event.action not in ["bind", "remove", "unbind"]: 154 return 155 156 logger.debug(f"udev event: {event.action} -> {event}") 157 158 id = int(event.sys_path.strip().split(".")[-1], 16) 159 160 device_ready, count = cls._uhid_devices.get(id, (False, 0)) 161 162 ready = event.action == "bind" 163 if not device_ready and ready: 164 count += 1 165 cls._uhid_devices[id] = (ready, count) 166 167 def is_ready(self: "UdevHIDIsReady") -> Tuple[bool, int]: 168 try: 169 return self._uhid_devices[self.uhid.hid_id] 170 except KeyError: 171 return (False, 0) 172 173 174class EvdevMatch(object): 175 def __init__( 176 self: "EvdevMatch", 177 *, 178 requires: List[Any] = [], 179 excludes: List[Any] = [], 180 req_properties: List[Any] = [], 181 excl_properties: List[Any] = [], 182 ) -> None: 183 self.requires = requires 184 self.excludes = excludes 185 self.req_properties = req_properties 186 self.excl_properties = excl_properties 187 188 def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool: 189 for m in self.requires: 190 if not evdev.has(m): 191 return False 192 for m in self.excludes: 193 if evdev.has(m): 194 return False 195 for p in self.req_properties: 196 if not evdev.has_property(p): 197 return False 198 for p in self.excl_properties: 199 if evdev.has_property(p): 200 return False 201 return True 202 203 204class EvdevDevice(object): 205 """ 206 Represents an Evdev node and its properties. 207 This is a stub for the libevdev devices, as they are relying on 208 uevent to get the data, saving us some ioctls to fetch the names 209 and properties. 210 """ 211 212 def __init__(self: "EvdevDevice", sysfs: Path) -> None: 213 self.sysfs = sysfs 214 self.event_node: Any = None 215 self.libevdev: Optional[libevdev.Device] = None 216 217 self.uevents = {} 218 # all of the interesting properties are stored in the input uevent, so in the parent 219 # so convert the uevent file of the parent input node into a dict 220 with open(sysfs.parent / "uevent") as f: 221 for line in f.readlines(): 222 key, value = line.strip().split("=") 223 self.uevents[key] = value.strip('"') 224 225 # we open all evdev nodes in order to not miss any event 226 self.open() 227 228 @property 229 def name(self: "EvdevDevice") -> str: 230 assert "NAME" in self.uevents 231 232 return self.uevents["NAME"] 233 234 @property 235 def evdev(self: "EvdevDevice") -> Path: 236 return Path("/dev/input") / self.sysfs.name 237 238 def matches_application( 239 self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch] 240 ) -> bool: 241 if self.libevdev is None: 242 return False 243 244 if application in matches: 245 return matches[application].is_a_match(self.libevdev) 246 247 logger.error( 248 f"application '{application}' is unknown, please update/fix hid-tools" 249 ) 250 assert False # hid-tools likely needs an update 251 252 def open(self: "EvdevDevice") -> libevdev.Device: 253 self.event_node = open(self.evdev, "rb") 254 self.libevdev = libevdev.Device(self.event_node) 255 256 assert self.libevdev.fd is not None 257 258 fd = self.libevdev.fd.fileno() 259 flag = fcntl.fcntl(fd, fcntl.F_GETFD) 260 fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) 261 262 return self.libevdev 263 264 def close(self: "EvdevDevice") -> None: 265 if self.libevdev is not None and self.libevdev.fd is not None: 266 self.libevdev.fd.close() 267 self.libevdev = None 268 if self.event_node is not None: 269 self.event_node.close() 270 self.event_node = None 271 272 273class BaseDevice(UHIDDevice): 274 # default _application_matches that matches nothing. This needs 275 # to be set in the subclasses to have get_evdev() working 276 _application_matches: Dict[str, EvdevMatch] = {} 277 278 def __init__( 279 self, 280 name, 281 application, 282 rdesc_str: Optional[str] = None, 283 rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None, 284 input_info=None, 285 ) -> None: 286 self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self) 287 if rdesc_str is None and rdesc is None: 288 raise Exception("Please provide at least a rdesc or rdesc_str") 289 super().__init__() 290 if name is None: 291 name = f"uhid gamepad test {self.__class__.__name__}" 292 if input_info is None: 293 input_info = (BusType.USB, 1, 2) 294 self.name = name 295 self.info = input_info 296 self.default_reportID = None 297 self.opened = False 298 self.started = False 299 self.application = application 300 self._input_nodes: Optional[list[EvdevDevice]] = None 301 if rdesc is None: 302 assert rdesc_str is not None 303 self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str) # type: ignore 304 else: 305 self.rdesc = rdesc # type: ignore 306 307 @property 308 def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]: 309 ps = self.walk_sysfs("power_supply", "power_supply/*") 310 if ps is None or len(ps) < 1: 311 return None 312 313 return PowerSupply(ps[0]) 314 315 @property 316 def led_classes(self: "BaseDevice") -> List[LED]: 317 leds = self.walk_sysfs("led", "**/max_brightness") 318 if leds is None: 319 return [] 320 321 return [LED(led.parent) for led in leds] 322 323 @property 324 def kernel_is_ready(self: "BaseDevice") -> bool: 325 return self._kernel_is_ready.is_ready()[0] and self.started 326 327 @property 328 def kernel_ready_count(self: "BaseDevice") -> int: 329 return self._kernel_is_ready.is_ready()[1] 330 331 @property 332 def input_nodes(self: "BaseDevice") -> List[EvdevDevice]: 333 if self._input_nodes is not None: 334 return self._input_nodes 335 336 if not self.kernel_is_ready or not self.started: 337 return [] 338 339 self._input_nodes = [ 340 EvdevDevice(path) 341 for path in self.walk_sysfs("input", "input/input*/event*") 342 ] 343 return self._input_nodes 344 345 def match_evdev_rule(self, application, evdev): 346 """Replace this in subclasses if the device has multiple reports 347 of the same type and we need to filter based on the actual evdev 348 node. 349 350 returning True will append the corresponding report to 351 `self.input_nodes[type]` 352 returning False will ignore this report / type combination 353 for the device. 354 """ 355 return True 356 357 def open(self): 358 self.opened = True 359 360 def _close_all_opened_evdev(self): 361 if self._input_nodes is not None: 362 for e in self._input_nodes: 363 e.close() 364 365 def __del__(self): 366 self._close_all_opened_evdev() 367 368 def close(self): 369 self.opened = False 370 371 def start(self, flags): 372 self.started = True 373 374 def stop(self): 375 self.started = False 376 self._close_all_opened_evdev() 377 378 def next_sync_events(self, application=None): 379 evdev = self.get_evdev(application) 380 if evdev is not None: 381 return list(evdev.events()) 382 return [] 383 384 @property 385 def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]: 386 return self._application_matches 387 388 @application_matches.setter 389 def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None: 390 self._application_matches = data 391 392 def get_evdev(self, application=None): 393 if application is None: 394 application = self.application 395 396 if len(self.input_nodes) == 0: 397 return None 398 399 assert self._input_nodes is not None 400 401 if len(self._input_nodes) == 1: 402 evdev = self._input_nodes[0] 403 if self.match_evdev_rule(application, evdev.libevdev): 404 return evdev.libevdev 405 else: 406 for _evdev in self._input_nodes: 407 if _evdev.matches_application(application, self.application_matches): 408 if self.match_evdev_rule(application, _evdev.libevdev): 409 return _evdev.libevdev 410 411 def is_ready(self): 412 """Returns whether a UHID device is ready. Can be overwritten in 413 subclasses to add extra conditions on when to consider a UHID 414 device ready. This can be: 415 416 - we need to wait on different types of input devices to be ready 417 (Touch Screen and Pen for example) 418 - we need to have at least 4 LEDs present 419 (len(self.uhdev.leds_classes) == 4) 420 - or any other combinations""" 421 return self.kernel_is_ready 422