xref: /linux/tools/testing/selftests/hid/tests/base_device.py (revision 642f9b2d608cc2239d22957ca1dc557d07470b50)
1#!/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3# -*- coding: utf-8 -*-
4#
5# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com>
6# Copyright (c) 2017 Red Hat, Inc.
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21import dataclasses
22import fcntl
23import functools
24import libevdev
25import os
26
27try:
28    import pyudev
29except ImportError:
30    raise ImportError("UHID is not supported due to missing pyudev dependency")
31
32import logging
33
34import hidtools.hid as hid
35from hidtools.uhid import UHIDDevice
36from hidtools.util import BusType
37
38from pathlib import Path
39from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union
40
41logger = logging.getLogger("hidtools.device.base_device")
42
43
44class SysfsFile(object):
45    def __init__(self, path):
46        self.path = path
47
48    def __set_value(self, value):
49        with open(self.path, "w") as f:
50            return f.write(f"{value}\n")
51
52    def __get_value(self):
53        with open(self.path) as f:
54            return f.read().strip()
55
56    @property
57    def int_value(self) -> int:
58        return int(self.__get_value())
59
60    @int_value.setter
61    def int_value(self, v: int) -> None:
62        self.__set_value(v)
63
64    @property
65    def str_value(self) -> str:
66        return self.__get_value()
67
68    @str_value.setter
69    def str_value(self, v: str) -> None:
70        self.__set_value(v)
71
72
73class LED(object):
74    def __init__(self, sys_path):
75        self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value
76        self.__brightness = SysfsFile(sys_path / "brightness")
77
78    @property
79    def brightness(self) -> int:
80        return self.__brightness.int_value
81
82    @brightness.setter
83    def brightness(self, value: int) -> None:
84        self.__brightness.int_value = value
85
86
87class PowerSupply(object):
88    """Represents Linux power_supply_class sysfs nodes."""
89
90    def __init__(self, sys_path):
91        self._capacity = SysfsFile(sys_path / "capacity")
92        self._status = SysfsFile(sys_path / "status")
93        self._type = SysfsFile(sys_path / "type")
94
95    @property
96    def capacity(self) -> int:
97        return self._capacity.int_value
98
99    @property
100    def status(self) -> str:
101        return self._status.str_value
102
103    @property
104    def type(self) -> str:
105        return self._type.str_value
106
107
108@dataclasses.dataclass
109class HidReadiness:
110    is_ready: bool = False
111    count: int = 0
112
113
114class HIDIsReady(object):
115    """
116    Companion class that binds to a kernel mechanism
117    and that allows to know when a uhid device is ready or not.
118
119    See :meth:`is_ready` for details.
120    """
121
122    def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None:
123        self.uhid = uhid
124
125    def is_ready(self: "HIDIsReady") -> HidReadiness:
126        """
127        Overwrite in subclasses: should return True or False whether
128        the attached uhid device is ready or not.
129        """
130        return HidReadiness()
131
132
133class UdevHIDIsReady(HIDIsReady):
134    _pyudev_context: ClassVar[Optional[pyudev.Context]] = None
135    _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None
136    _uhid_devices: ClassVar[Dict[int, HidReadiness]] = {}
137
138    def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None:
139        super().__init__(uhid)
140        self._init_pyudev()
141
142    @classmethod
143    def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None:
144        if cls._pyudev_context is None:
145            cls._pyudev_context = pyudev.Context()
146            cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context)
147            cls._pyudev_monitor.filter_by("hid")
148            cls._pyudev_monitor.start()
149
150            UHIDDevice._append_fd_to_poll(
151                cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback
152            )
153
154    @classmethod
155    def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None:
156        if cls._pyudev_monitor is None:
157            return
158        event: pyudev.Device
159        for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None):
160            if event.action not in ["bind", "remove", "unbind"]:
161                return
162
163            logger.debug(f"udev event: {event.action} -> {event}")
164
165            id = int(event.sys_path.strip().split(".")[-1], 16)
166
167            readiness = cls._uhid_devices.setdefault(id, HidReadiness())
168
169            ready = event.action == "bind"
170            if not readiness.is_ready and ready:
171                readiness.count += 1
172
173            readiness.is_ready = ready
174
175    def is_ready(self: "UdevHIDIsReady") -> HidReadiness:
176        try:
177            return self._uhid_devices[self.uhid.hid_id]
178        except KeyError:
179            return HidReadiness()
180
181
182class EvdevMatch(object):
183    def __init__(
184        self: "EvdevMatch",
185        *,
186        requires: List[Any] = [],
187        excludes: List[Any] = [],
188        req_properties: List[Any] = [],
189        excl_properties: List[Any] = [],
190    ) -> None:
191        self.requires = requires
192        self.excludes = excludes
193        self.req_properties = req_properties
194        self.excl_properties = excl_properties
195
196    def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool:
197        for m in self.requires:
198            if not evdev.has(m):
199                return False
200        for m in self.excludes:
201            if evdev.has(m):
202                return False
203        for p in self.req_properties:
204            if not evdev.has_property(p):
205                return False
206        for p in self.excl_properties:
207            if evdev.has_property(p):
208                return False
209        return True
210
211
212class EvdevDevice(object):
213    """
214    Represents an Evdev node and its properties.
215    This is a stub for the libevdev devices, as they are relying on
216    uevent to get the data, saving us some ioctls to fetch the names
217    and properties.
218    """
219
220    def __init__(self: "EvdevDevice", sysfs: Path) -> None:
221        self.sysfs = sysfs
222        self.event_node: Any = None
223        self.libevdev: Optional[libevdev.Device] = None
224
225        self.uevents = {}
226        # all of the interesting properties are stored in the input uevent, so in the parent
227        # so convert the uevent file of the parent input node into a dict
228        with open(sysfs.parent / "uevent") as f:
229            for line in f.readlines():
230                key, value = line.strip().split("=")
231                self.uevents[key] = value.strip('"')
232
233        # we open all evdev nodes in order to not miss any event
234        self.open()
235
236    @property
237    def name(self: "EvdevDevice") -> str:
238        assert "NAME" in self.uevents
239
240        return self.uevents["NAME"]
241
242    @property
243    def evdev(self: "EvdevDevice") -> Path:
244        return Path("/dev/input") / self.sysfs.name
245
246    def matches_application(
247        self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch]
248    ) -> bool:
249        if self.libevdev is None:
250            return False
251
252        if application in matches:
253            return matches[application].is_a_match(self.libevdev)
254
255        logger.error(
256            f"application '{application}' is unknown, please update/fix hid-tools"
257        )
258        assert False  # hid-tools likely needs an update
259
260    def open(self: "EvdevDevice") -> libevdev.Device:
261        self.event_node = open(self.evdev, "rb")
262        self.libevdev = libevdev.Device(self.event_node)
263
264        assert self.libevdev.fd is not None
265
266        fd = self.libevdev.fd.fileno()
267        flag = fcntl.fcntl(fd, fcntl.F_GETFD)
268        fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
269
270        return self.libevdev
271
272    def close(self: "EvdevDevice") -> None:
273        if self.libevdev is not None and self.libevdev.fd is not None:
274            self.libevdev.fd.close()
275            self.libevdev = None
276        if self.event_node is not None:
277            self.event_node.close()
278            self.event_node = None
279
280
281class BaseDevice(UHIDDevice):
282    # default _application_matches that matches nothing. This needs
283    # to be set in the subclasses to have get_evdev() working
284    _application_matches: Dict[str, EvdevMatch] = {}
285
286    def __init__(
287        self,
288        name,
289        application,
290        rdesc_str: Optional[str] = None,
291        rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None,
292        input_info=None,
293    ) -> None:
294        self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self)
295        if rdesc_str is None and rdesc is None:
296            raise Exception("Please provide at least a rdesc or rdesc_str")
297        super().__init__()
298        if name is None:
299            name = f"uhid gamepad test {self.__class__.__name__}"
300        if input_info is None:
301            input_info = (BusType.USB, 1, 2)
302        self.name = name
303        self.info = input_info
304        self.default_reportID = None
305        self.opened = False
306        self.started = False
307        self.application = application
308        self._input_nodes: Optional[list[EvdevDevice]] = None
309        if rdesc is None:
310            assert rdesc_str is not None
311            self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str)  # type: ignore
312        else:
313            self.rdesc = rdesc  # type: ignore
314
315    @property
316    def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]:
317        ps = self.walk_sysfs("power_supply", "power_supply/*")
318        if ps is None or len(ps) < 1:
319            return None
320
321        return PowerSupply(ps[0])
322
323    @property
324    def led_classes(self: "BaseDevice") -> List[LED]:
325        leds = self.walk_sysfs("led", "**/max_brightness")
326        if leds is None:
327            return []
328
329        return [LED(led.parent) for led in leds]
330
331    @property
332    def kernel_is_ready(self: "BaseDevice") -> bool:
333        return self._kernel_is_ready.is_ready().is_ready and self.started
334
335    @property
336    def kernel_ready_count(self: "BaseDevice") -> int:
337        return self._kernel_is_ready.is_ready().count
338
339    @property
340    def input_nodes(self: "BaseDevice") -> List[EvdevDevice]:
341        if self._input_nodes is not None:
342            return self._input_nodes
343
344        if not self.kernel_is_ready or not self.started:
345            return []
346
347        self._input_nodes = [
348            EvdevDevice(path)
349            for path in self.walk_sysfs("input", "input/input*/event*")
350        ]
351        return self._input_nodes
352
353    def match_evdev_rule(self, application, evdev):
354        """Replace this in subclasses if the device has multiple reports
355        of the same type and we need to filter based on the actual evdev
356        node.
357
358        returning True will append the corresponding report to
359        `self.input_nodes[type]`
360        returning False  will ignore this report / type combination
361        for the device.
362        """
363        return True
364
365    def open(self):
366        self.opened = True
367
368    def _close_all_opened_evdev(self):
369        if self._input_nodes is not None:
370            for e in self._input_nodes:
371                e.close()
372
373    def __del__(self):
374        self._close_all_opened_evdev()
375
376    def close(self):
377        self.opened = False
378
379    def start(self, flags):
380        self.started = True
381
382    def stop(self):
383        self.started = False
384        self._close_all_opened_evdev()
385
386    def next_sync_events(self, application=None):
387        evdev = self.get_evdev(application)
388        if evdev is not None:
389            return list(evdev.events())
390        return []
391
392    @property
393    def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]:
394        return self._application_matches
395
396    @application_matches.setter
397    def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None:
398        self._application_matches = data
399
400    def get_evdev(self, application=None):
401        if application is None:
402            application = self.application
403
404        if len(self.input_nodes) == 0:
405            return None
406
407        assert self._input_nodes is not None
408
409        if len(self._input_nodes) == 1:
410            evdev = self._input_nodes[0]
411            if self.match_evdev_rule(application, evdev.libevdev):
412                return evdev.libevdev
413        else:
414            for _evdev in self._input_nodes:
415                if _evdev.matches_application(application, self.application_matches):
416                    if self.match_evdev_rule(application, _evdev.libevdev):
417                        return _evdev.libevdev
418
419    def is_ready(self):
420        """Returns whether a UHID device is ready. Can be overwritten in
421        subclasses to add extra conditions on when to consider a UHID
422        device ready. This can be:
423
424        - we need to wait on different types of input devices to be ready
425          (Touch Screen and Pen for example)
426        - we need to have at least 4 LEDs present
427          (len(self.uhdev.leds_classes) == 4)
428        - or any other combinations"""
429        return self.kernel_is_ready
430