| #!/bin/env python3 |
| # SPDX-License-Identifier: GPL-2.0 |
| # -*- coding: utf-8 -*- |
| # |
| # Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com> |
| # Copyright (c) 2017 Red Hat, Inc. |
| # |
| # This program is free software: you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| |
| import fcntl |
| import functools |
| import libevdev |
| import os |
| |
| try: |
| import pyudev |
| except ImportError: |
| raise ImportError("UHID is not supported due to missing pyudev dependency") |
| |
| import logging |
| |
| import hidtools.hid as hid |
| from hidtools.uhid import UHIDDevice |
| from hidtools.util import BusType |
| |
| from pathlib import Path |
| from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union |
| |
| logger = logging.getLogger("hidtools.device.base_device") |
| |
| |
| class SysfsFile(object): |
| def __init__(self, path): |
| self.path = path |
| |
| def __set_value(self, value): |
| with open(self.path, "w") as f: |
| return f.write(f"{value}\n") |
| |
| def __get_value(self): |
| with open(self.path) as f: |
| return f.read().strip() |
| |
| @property |
| def int_value(self) -> int: |
| return int(self.__get_value()) |
| |
| @int_value.setter |
| def int_value(self, v: int) -> None: |
| self.__set_value(v) |
| |
| @property |
| def str_value(self) -> str: |
| return self.__get_value() |
| |
| @str_value.setter |
| def str_value(self, v: str) -> None: |
| self.__set_value(v) |
| |
| |
| class LED(object): |
| def __init__(self, sys_path): |
| self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value |
| self.__brightness = SysfsFile(sys_path / "brightness") |
| |
| @property |
| def brightness(self) -> int: |
| return self.__brightness.int_value |
| |
| @brightness.setter |
| def brightness(self, value: int) -> None: |
| self.__brightness.int_value = value |
| |
| |
| class PowerSupply(object): |
| """Represents Linux power_supply_class sysfs nodes.""" |
| |
| def __init__(self, sys_path): |
| self._capacity = SysfsFile(sys_path / "capacity") |
| self._status = SysfsFile(sys_path / "status") |
| self._type = SysfsFile(sys_path / "type") |
| |
| @property |
| def capacity(self) -> int: |
| return self._capacity.int_value |
| |
| @property |
| def status(self) -> str: |
| return self._status.str_value |
| |
| @property |
| def type(self) -> str: |
| return self._type.str_value |
| |
| |
| class HIDIsReady(object): |
| """ |
| Companion class that binds to a kernel mechanism |
| and that allows to know when a uhid device is ready or not. |
| |
| See :meth:`is_ready` for details. |
| """ |
| |
| def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None: |
| self.uhid = uhid |
| |
| def is_ready(self: "HIDIsReady") -> bool: |
| """ |
| Overwrite in subclasses: should return True or False whether |
| the attached uhid device is ready or not. |
| """ |
| return False |
| |
| |
| class UdevHIDIsReady(HIDIsReady): |
| _pyudev_context: ClassVar[Optional[pyudev.Context]] = None |
| _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None |
| _uhid_devices: ClassVar[Dict[int, Tuple[bool, int]]] = {} |
| |
| def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None: |
| super().__init__(uhid) |
| self._init_pyudev() |
| |
| @classmethod |
| def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None: |
| if cls._pyudev_context is None: |
| cls._pyudev_context = pyudev.Context() |
| cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context) |
| cls._pyudev_monitor.filter_by("hid") |
| cls._pyudev_monitor.start() |
| |
| UHIDDevice._append_fd_to_poll( |
| cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback |
| ) |
| |
| @classmethod |
| def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None: |
| if cls._pyudev_monitor is None: |
| return |
| event: pyudev.Device |
| for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None): |
| if event.action not in ["bind", "remove", "unbind"]: |
| return |
| |
| logger.debug(f"udev event: {event.action} -> {event}") |
| |
| id = int(event.sys_path.strip().split(".")[-1], 16) |
| |
| device_ready, count = cls._uhid_devices.get(id, (False, 0)) |
| |
| ready = event.action == "bind" |
| if not device_ready and ready: |
| count += 1 |
| cls._uhid_devices[id] = (ready, count) |
| |
| def is_ready(self: "UdevHIDIsReady") -> Tuple[bool, int]: |
| try: |
| return self._uhid_devices[self.uhid.hid_id] |
| except KeyError: |
| return (False, 0) |
| |
| |
| class EvdevMatch(object): |
| def __init__( |
| self: "EvdevMatch", |
| *, |
| requires: List[Any] = [], |
| excludes: List[Any] = [], |
| req_properties: List[Any] = [], |
| excl_properties: List[Any] = [], |
| ) -> None: |
| self.requires = requires |
| self.excludes = excludes |
| self.req_properties = req_properties |
| self.excl_properties = excl_properties |
| |
| def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool: |
| for m in self.requires: |
| if not evdev.has(m): |
| return False |
| for m in self.excludes: |
| if evdev.has(m): |
| return False |
| for p in self.req_properties: |
| if not evdev.has_property(p): |
| return False |
| for p in self.excl_properties: |
| if evdev.has_property(p): |
| return False |
| return True |
| |
| |
| class EvdevDevice(object): |
| """ |
| Represents an Evdev node and its properties. |
| This is a stub for the libevdev devices, as they are relying on |
| uevent to get the data, saving us some ioctls to fetch the names |
| and properties. |
| """ |
| |
| def __init__(self: "EvdevDevice", sysfs: Path) -> None: |
| self.sysfs = sysfs |
| self.event_node: Any = None |
| self.libevdev: Optional[libevdev.Device] = None |
| |
| self.uevents = {} |
| # all of the interesting properties are stored in the input uevent, so in the parent |
| # so convert the uevent file of the parent input node into a dict |
| with open(sysfs.parent / "uevent") as f: |
| for line in f.readlines(): |
| key, value = line.strip().split("=") |
| self.uevents[key] = value.strip('"') |
| |
| # we open all evdev nodes in order to not miss any event |
| self.open() |
| |
| @property |
| def name(self: "EvdevDevice") -> str: |
| assert "NAME" in self.uevents |
| |
| return self.uevents["NAME"] |
| |
| @property |
| def evdev(self: "EvdevDevice") -> Path: |
| return Path("/dev/input") / self.sysfs.name |
| |
| def matches_application( |
| self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch] |
| ) -> bool: |
| if self.libevdev is None: |
| return False |
| |
| if application in matches: |
| return matches[application].is_a_match(self.libevdev) |
| |
| logger.error( |
| f"application '{application}' is unknown, please update/fix hid-tools" |
| ) |
| assert False # hid-tools likely needs an update |
| |
| def open(self: "EvdevDevice") -> libevdev.Device: |
| self.event_node = open(self.evdev, "rb") |
| self.libevdev = libevdev.Device(self.event_node) |
| |
| assert self.libevdev.fd is not None |
| |
| fd = self.libevdev.fd.fileno() |
| flag = fcntl.fcntl(fd, fcntl.F_GETFD) |
| fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) |
| |
| return self.libevdev |
| |
| def close(self: "EvdevDevice") -> None: |
| if self.libevdev is not None and self.libevdev.fd is not None: |
| self.libevdev.fd.close() |
| self.libevdev = None |
| if self.event_node is not None: |
| self.event_node.close() |
| self.event_node = None |
| |
| |
| class BaseDevice(UHIDDevice): |
| # default _application_matches that matches nothing. This needs |
| # to be set in the subclasses to have get_evdev() working |
| _application_matches: Dict[str, EvdevMatch] = {} |
| |
| def __init__( |
| self, |
| name, |
| application, |
| rdesc_str: Optional[str] = None, |
| rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None, |
| input_info=None, |
| ) -> None: |
| self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self) |
| if rdesc_str is None and rdesc is None: |
| raise Exception("Please provide at least a rdesc or rdesc_str") |
| super().__init__() |
| if name is None: |
| name = f"uhid gamepad test {self.__class__.__name__}" |
| if input_info is None: |
| input_info = (BusType.USB, 1, 2) |
| self.name = name |
| self.info = input_info |
| self.default_reportID = None |
| self.opened = False |
| self.started = False |
| self.application = application |
| self._input_nodes: Optional[list[EvdevDevice]] = None |
| if rdesc is None: |
| assert rdesc_str is not None |
| self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str) # type: ignore |
| else: |
| self.rdesc = rdesc # type: ignore |
| |
| @property |
| def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]: |
| ps = self.walk_sysfs("power_supply", "power_supply/*") |
| if ps is None or len(ps) < 1: |
| return None |
| |
| return PowerSupply(ps[0]) |
| |
| @property |
| def led_classes(self: "BaseDevice") -> List[LED]: |
| leds = self.walk_sysfs("led", "**/max_brightness") |
| if leds is None: |
| return [] |
| |
| return [LED(led.parent) for led in leds] |
| |
| @property |
| def kernel_is_ready(self: "BaseDevice") -> bool: |
| return self._kernel_is_ready.is_ready()[0] and self.started |
| |
| @property |
| def kernel_ready_count(self: "BaseDevice") -> int: |
| return self._kernel_is_ready.is_ready()[1] |
| |
| @property |
| def input_nodes(self: "BaseDevice") -> List[EvdevDevice]: |
| if self._input_nodes is not None: |
| return self._input_nodes |
| |
| if not self.kernel_is_ready or not self.started: |
| return [] |
| |
| self._input_nodes = [ |
| EvdevDevice(path) |
| for path in self.walk_sysfs("input", "input/input*/event*") |
| ] |
| return self._input_nodes |
| |
| def match_evdev_rule(self, application, evdev): |
| """Replace this in subclasses if the device has multiple reports |
| of the same type and we need to filter based on the actual evdev |
| node. |
| |
| returning True will append the corresponding report to |
| `self.input_nodes[type]` |
| returning False will ignore this report / type combination |
| for the device. |
| """ |
| return True |
| |
| def open(self): |
| self.opened = True |
| |
| def _close_all_opened_evdev(self): |
| if self._input_nodes is not None: |
| for e in self._input_nodes: |
| e.close() |
| |
| def __del__(self): |
| self._close_all_opened_evdev() |
| |
| def close(self): |
| self.opened = False |
| |
| def start(self, flags): |
| self.started = True |
| |
| def stop(self): |
| self.started = False |
| self._close_all_opened_evdev() |
| |
| def next_sync_events(self, application=None): |
| evdev = self.get_evdev(application) |
| if evdev is not None: |
| return list(evdev.events()) |
| return [] |
| |
| @property |
| def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]: |
| return self._application_matches |
| |
| @application_matches.setter |
| def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None: |
| self._application_matches = data |
| |
| def get_evdev(self, application=None): |
| if application is None: |
| application = self.application |
| |
| if len(self.input_nodes) == 0: |
| return None |
| |
| assert self._input_nodes is not None |
| |
| if len(self._input_nodes) == 1: |
| evdev = self._input_nodes[0] |
| if self.match_evdev_rule(application, evdev.libevdev): |
| return evdev.libevdev |
| else: |
| for _evdev in self._input_nodes: |
| if _evdev.matches_application(application, self.application_matches): |
| if self.match_evdev_rule(application, _evdev.libevdev): |
| return _evdev.libevdev |
| |
| def is_ready(self): |
| """Returns whether a UHID device is ready. Can be overwritten in |
| subclasses to add extra conditions on when to consider a UHID |
| device ready. This can be: |
| |
| - we need to wait on different types of input devices to be ready |
| (Touch Screen and Pen for example) |
| - we need to have at least 4 LEDs present |
| (len(self.uhdev.leds_classes) == 4) |
| - or any other combinations""" |
| return self.kernel_is_ready |