1#!/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# -*- coding: utf-8 -*- 4# 5# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com> 6# Copyright (c) 2017 Red Hat, Inc. 7# 8# This program is free software: you can redistribute it and/or modify 9# it under the terms of the GNU General Public License as published by 10# the Free Software Foundation; either version 2 of the License, or 11# (at your option) any later version. 12# 13# This program is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# GNU General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with this program. If not, see <http://www.gnu.org/licenses/>. 20 21import dataclasses 22import fcntl 23import functools 24import libevdev 25import os 26 27try: 28 import pyudev 29except ImportError: 30 raise ImportError("UHID is not supported due to missing pyudev dependency") 31 32import logging 33 34import hidtools.hid as hid 35from hidtools.uhid import UHIDDevice 36from hidtools.util import BusType 37 38from pathlib import Path 39from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union 40 41logger = logging.getLogger("hidtools.device.base_device") 42 43 44class SysfsFile(object): 45 def __init__(self, path): 46 self.path = path 47 48 def __set_value(self, value): 49 with open(self.path, "w") as f: 50 return f.write(f"{value}\n") 51 52 def __get_value(self): 53 with open(self.path) as f: 54 return f.read().strip() 55 56 @property 57 def int_value(self) -> int: 58 return int(self.__get_value()) 59 60 @int_value.setter 61 def int_value(self, v: int) -> None: 62 self.__set_value(v) 63 64 @property 65 def str_value(self) -> str: 66 return self.__get_value() 67 68 @str_value.setter 69 def str_value(self, v: str) -> None: 70 self.__set_value(v) 71 72 73class LED(object): 74 def __init__(self, sys_path): 75 self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value 76 self.__brightness = SysfsFile(sys_path / "brightness") 77 78 @property 79 def brightness(self) -> int: 80 return self.__brightness.int_value 81 82 @brightness.setter 83 def brightness(self, value: int) -> None: 84 self.__brightness.int_value = value 85 86 87class PowerSupply(object): 88 """Represents Linux power_supply_class sysfs nodes.""" 89 90 def __init__(self, sys_path): 91 self._capacity = SysfsFile(sys_path / "capacity") 92 self._status = SysfsFile(sys_path / "status") 93 self._type = SysfsFile(sys_path / "type") 94 95 @property 96 def capacity(self) -> int: 97 return self._capacity.int_value 98 99 @property 100 def status(self) -> str: 101 return self._status.str_value 102 103 @property 104 def type(self) -> str: 105 return self._type.str_value 106 107 108@dataclasses.dataclass 109class HidReadiness: 110 is_ready: bool = False 111 count: int = 0 112 113 114class HIDIsReady(object): 115 """ 116 Companion class that binds to a kernel mechanism 117 and that allows to know when a uhid device is ready or not. 118 119 See :meth:`is_ready` for details. 120 """ 121 122 def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None: 123 self.uhid = uhid 124 125 def is_ready(self: "HIDIsReady") -> HidReadiness: 126 """ 127 Overwrite in subclasses: should return True or False whether 128 the attached uhid device is ready or not. 129 """ 130 return HidReadiness() 131 132 133class UdevHIDIsReady(HIDIsReady): 134 _pyudev_context: ClassVar[Optional[pyudev.Context]] = None 135 _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None 136 _uhid_devices: ClassVar[Dict[int, HidReadiness]] = {} 137 138 def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None: 139 super().__init__(uhid) 140 self._init_pyudev() 141 142 @classmethod 143 def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None: 144 if cls._pyudev_context is None: 145 cls._pyudev_context = pyudev.Context() 146 cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context) 147 cls._pyudev_monitor.filter_by("hid") 148 cls._pyudev_monitor.start() 149 150 UHIDDevice._append_fd_to_poll( 151 cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback 152 ) 153 154 @classmethod 155 def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None: 156 if cls._pyudev_monitor is None: 157 return 158 event: pyudev.Device 159 for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None): 160 if event.action not in ["bind", "remove", "unbind"]: 161 return 162 163 logger.debug(f"udev event: {event.action} -> {event}") 164 165 id = int(event.sys_path.strip().split(".")[-1], 16) 166 167 readiness = cls._uhid_devices.setdefault(id, HidReadiness()) 168 169 ready = event.action == "bind" 170 if not readiness.is_ready and ready: 171 readiness.count += 1 172 173 readiness.is_ready = ready 174 175 def is_ready(self: "UdevHIDIsReady") -> HidReadiness: 176 try: 177 return self._uhid_devices[self.uhid.hid_id] 178 except KeyError: 179 return HidReadiness() 180 181 182class EvdevMatch(object): 183 def __init__( 184 self: "EvdevMatch", 185 *, 186 requires: List[Any] = [], 187 excludes: List[Any] = [], 188 req_properties: List[Any] = [], 189 excl_properties: List[Any] = [], 190 ) -> None: 191 self.requires = requires 192 self.excludes = excludes 193 self.req_properties = req_properties 194 self.excl_properties = excl_properties 195 196 def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool: 197 for m in self.requires: 198 if not evdev.has(m): 199 return False 200 for m in self.excludes: 201 if evdev.has(m): 202 return False 203 for p in self.req_properties: 204 if not evdev.has_property(p): 205 return False 206 for p in self.excl_properties: 207 if evdev.has_property(p): 208 return False 209 return True 210 211 212class EvdevDevice(object): 213 """ 214 Represents an Evdev node and its properties. 215 This is a stub for the libevdev devices, as they are relying on 216 uevent to get the data, saving us some ioctls to fetch the names 217 and properties. 218 """ 219 220 def __init__(self: "EvdevDevice", sysfs: Path) -> None: 221 self.sysfs = sysfs 222 self.event_node: Any = None 223 self.libevdev: Optional[libevdev.Device] = None 224 225 self.uevents = {} 226 # all of the interesting properties are stored in the input uevent, so in the parent 227 # so convert the uevent file of the parent input node into a dict 228 with open(sysfs.parent / "uevent") as f: 229 for line in f.readlines(): 230 key, value = line.strip().split("=") 231 self.uevents[key] = value.strip('"') 232 233 # we open all evdev nodes in order to not miss any event 234 self.open() 235 236 @property 237 def name(self: "EvdevDevice") -> str: 238 assert "NAME" in self.uevents 239 240 return self.uevents["NAME"] 241 242 @property 243 def evdev(self: "EvdevDevice") -> Path: 244 return Path("/dev/input") / self.sysfs.name 245 246 def matches_application( 247 self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch] 248 ) -> bool: 249 if self.libevdev is None: 250 return False 251 252 if application in matches: 253 return matches[application].is_a_match(self.libevdev) 254 255 logger.error( 256 f"application '{application}' is unknown, please update/fix hid-tools" 257 ) 258 assert False # hid-tools likely needs an update 259 260 def open(self: "EvdevDevice") -> libevdev.Device: 261 self.event_node = open(self.evdev, "rb") 262 self.libevdev = libevdev.Device(self.event_node) 263 264 assert self.libevdev.fd is not None 265 266 fd = self.libevdev.fd.fileno() 267 flag = fcntl.fcntl(fd, fcntl.F_GETFD) 268 fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) 269 270 return self.libevdev 271 272 def close(self: "EvdevDevice") -> None: 273 if self.libevdev is not None and self.libevdev.fd is not None: 274 self.libevdev.fd.close() 275 self.libevdev = None 276 if self.event_node is not None: 277 self.event_node.close() 278 self.event_node = None 279 280 281class BaseDevice(UHIDDevice): 282 # default _application_matches that matches nothing. This needs 283 # to be set in the subclasses to have get_evdev() working 284 _application_matches: Dict[str, EvdevMatch] = {} 285 286 def __init__( 287 self, 288 name, 289 application, 290 rdesc_str: Optional[str] = None, 291 rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None, 292 input_info=None, 293 ) -> None: 294 self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self) 295 if rdesc_str is None and rdesc is None: 296 raise Exception("Please provide at least a rdesc or rdesc_str") 297 super().__init__() 298 if name is None: 299 name = f"uhid gamepad test {self.__class__.__name__}" 300 if input_info is None: 301 input_info = (BusType.USB, 1, 2) 302 self.name = name 303 self.info = input_info 304 self.default_reportID = None 305 self.opened = False 306 self.started = False 307 self.application = application 308 self._input_nodes: Optional[list[EvdevDevice]] = None 309 if rdesc is None: 310 assert rdesc_str is not None 311 self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str) # type: ignore 312 else: 313 self.rdesc = rdesc # type: ignore 314 315 @property 316 def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]: 317 ps = self.walk_sysfs("power_supply", "power_supply/*") 318 if ps is None or len(ps) < 1: 319 return None 320 321 return PowerSupply(ps[0]) 322 323 @property 324 def led_classes(self: "BaseDevice") -> List[LED]: 325 leds = self.walk_sysfs("led", "**/max_brightness") 326 if leds is None: 327 return [] 328 329 return [LED(led.parent) for led in leds] 330 331 @property 332 def kernel_is_ready(self: "BaseDevice") -> bool: 333 return self._kernel_is_ready.is_ready().is_ready and self.started 334 335 @property 336 def kernel_ready_count(self: "BaseDevice") -> int: 337 return self._kernel_is_ready.is_ready().count 338 339 @property 340 def input_nodes(self: "BaseDevice") -> List[EvdevDevice]: 341 if self._input_nodes is not None: 342 return self._input_nodes 343 344 if not self.kernel_is_ready or not self.started: 345 return [] 346 347 self._input_nodes = [ 348 EvdevDevice(path) 349 for path in self.walk_sysfs("input", "input/input*/event*") 350 ] 351 return self._input_nodes 352 353 def match_evdev_rule(self, application, evdev): 354 """Replace this in subclasses if the device has multiple reports 355 of the same type and we need to filter based on the actual evdev 356 node. 357 358 returning True will append the corresponding report to 359 `self.input_nodes[type]` 360 returning False will ignore this report / type combination 361 for the device. 362 """ 363 return True 364 365 def open(self): 366 self.opened = True 367 368 def _close_all_opened_evdev(self): 369 if self._input_nodes is not None: 370 for e in self._input_nodes: 371 e.close() 372 373 def __del__(self): 374 self._close_all_opened_evdev() 375 376 def close(self): 377 self.opened = False 378 379 def start(self, flags): 380 self.started = True 381 382 def stop(self): 383 self.started = False 384 self._close_all_opened_evdev() 385 386 def next_sync_events(self, application=None): 387 evdev = self.get_evdev(application) 388 if evdev is not None: 389 return list(evdev.events()) 390 return [] 391 392 @property 393 def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]: 394 return self._application_matches 395 396 @application_matches.setter 397 def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None: 398 self._application_matches = data 399 400 def get_evdev(self, application=None): 401 if application is None: 402 application = self.application 403 404 if len(self.input_nodes) == 0: 405 return None 406 407 assert self._input_nodes is not None 408 409 if len(self._input_nodes) == 1: 410 evdev = self._input_nodes[0] 411 if self.match_evdev_rule(application, evdev.libevdev): 412 return evdev.libevdev 413 else: 414 for _evdev in self._input_nodes: 415 if _evdev.matches_application(application, self.application_matches): 416 if self.match_evdev_rule(application, _evdev.libevdev): 417 return _evdev.libevdev 418 419 def is_ready(self): 420 """Returns whether a UHID device is ready. Can be overwritten in 421 subclasses to add extra conditions on when to consider a UHID 422 device ready. This can be: 423 424 - we need to wait on different types of input devices to be ready 425 (Touch Screen and Pen for example) 426 - we need to have at least 4 LEDs present 427 (len(self.uhdev.leds_classes) == 4) 428 - or any other combinations""" 429 return self.kernel_is_ready 430