Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single-Threaded Service #592

Draft
wants to merge 15 commits into
base: beta
Choose a base branch
from
4 changes: 2 additions & 2 deletions bin/input-remapper-control
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ def communicate(options, daemon):
logger.info('Autoloading all')
# timeout is not documented, for more info see
# https://github.com/LEW21/pydbus/blob/master/pydbus/proxy_method.py
daemon.autoload(timeout=10)
daemon.autoload()
else:
group = require_group()
logger.info('Asking daemon to autoload for %s', options.device)
daemon.autoload_single(group.key, timeout=2)
daemon.autoload_single(group.key)

if options.command == START:
group = require_group()
Expand Down
1 change: 0 additions & 1 deletion bin/input-remapper-service
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,4 @@ if __name__ == '__main__':
log_info('input-remapper-service')

daemon = Daemon()
daemon.publish()
daemon.run()
165 changes: 85 additions & 80 deletions inputremapper/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@

https://github.com/LEW21/pydbus/tree/cc407c8b1d25b7e28a6d661a29f9e661b1c9b964/examples/clientserver # noqa pylint: disable=line-too-long
"""


import atexit
import asyncio
import signal
import json
import os
import sys
import time
import tracemalloc
import typing
from pathlib import PurePath
from typing import Protocol, Dict, Optional

import gi
from dbus_next.aio import MessageBus
from dbus_next import BusType, service, RequestNameReply
from pydbus import SystemBus

import gi

gi.require_version("GLib", "2.0")
from gi.repository import GLib

Expand All @@ -45,11 +49,14 @@
from inputremapper.configs.system_mapping import system_mapping
from inputremapper.groups import groups
from inputremapper.configs.paths import get_config_path, sanitize_path_component, USER
from inputremapper.injection.macros.macro import macro_variables
from inputremapper.injection.global_uinputs import global_uinputs

tracemalloc.start()

BUS_NAME = "inputremapper.Control"
PATH_NAME = "/inputremapper/Control"
INTERFACE_NAME = "inputremapper.Control"

# timeout in seconds, see
# https://github.com/LEW21/pydbus/blob/cc407c8b1d25b7e28a6d661a29f9e661b1c9b964/pydbus/proxy.py
BUS_TIMEOUT = 10
Expand Down Expand Up @@ -145,7 +152,19 @@ def hello(self, out: str) -> str:
...


class Daemon:
def method(name: str = None, disabled: bool = False):
# this is a workaround for https://github.com/altdesktop/python-dbus-next/issues/119
@typing.no_type_check_decorator
def fixed_decorator(fn):
# we don't actually decorate the function
# dbus-next only cares about the __dict__
fn.__dict__ = service.method(name, disabled)(fn).__dict__
return fn

return fixed_decorator


class Daemon(service.ServiceInterface):
"""Starts injecting keycodes based on the configuration.

Can be talked to either over dbus or by instantiating it.
Expand All @@ -156,44 +175,13 @@ class Daemon:
on its own.
"""

# https://dbus.freedesktop.org/doc/dbus-specification.html#type-system
dbus = f"""
<node>
<interface name='{BUS_NAME}'>
<method name='stop_injecting'>
<arg type='s' name='group_key' direction='in'/>
</method>
<method name='get_state'>
<arg type='s' name='group_key' direction='in'/>
<arg type='s' name='response' direction='out'/>
</method>
<method name='start_injecting'>
<arg type='s' name='group_key' direction='in'/>
<arg type='s' name='preset' direction='in'/>
<arg type='b' name='response' direction='out'/>
</method>
<method name='stop_all'>
</method>
<method name='set_config_dir'>
<arg type='s' name='config_dir' direction='in'/>
</method>
<method name='autoload'>
</method>
<method name='autoload_single'>
<arg type='s' name='group_key' direction='in'/>
</method>
<method name='hello'>
<arg type='s' name='out' direction='in'/>
<arg type='s' name='response' direction='out'/>
</method>
</interface>
</node>
"""

def __init__(self):
"""Constructs the daemon."""
logger.debug("Creating daemon")
super().__init__(INTERFACE_NAME)
self.injectors: Dict[str, Injector] = {}
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._bus: Optional[MessageBus] = None

self.config_dir = None

Expand All @@ -207,10 +195,7 @@ def __init__(self):
self.autoload_history = AutoloadHistory()
self.refreshed_devices_at = 0

atexit.register(self.stop_all)

# initialize stuff that is needed alongside the daemon process
macro_variables.start()
signal.signal(signal.SIGINT, self.quit)

@classmethod
def connect(cls, fallback: bool = True) -> DaemonProxy:
Expand All @@ -225,7 +210,7 @@ def connect(cls, fallback: bool = True) -> DaemonProxy:
try:
interface = bus.get(BUS_NAME, timeout=BUS_TIMEOUT)
logger.info("Connected to the service")
except GLib.GError as error:
except GLib.Error as error:
if not fallback:
logger.error("Service not running? %s", error)
return None
Expand All @@ -250,7 +235,7 @@ def connect(cls, fallback: bool = True) -> DaemonProxy:
try:
interface = bus.get(BUS_NAME, timeout=BUS_TIMEOUT)
break
except GLib.GError as error:
except GLib.Error as error:
logger.debug("Attempt %d to reach the service failed:", attempt + 1)
logger.debug('"%s"', error)
time.sleep(0.2)
Expand All @@ -261,26 +246,40 @@ def connect(cls, fallback: bool = True) -> DaemonProxy:
if USER != "root":
config_path = get_config_path()
logger.debug('Telling service about "%s"', config_path)
interface.set_config_dir(get_config_path(), timeout=2)
interface.set_config_dir(get_config_path())

return interface

def publish(self):
"""Make the dbus interface available."""
bus = SystemBus()
try:
bus.publish(BUS_NAME, self)
except RuntimeError as error:
logger.error("Is the service already running? (%s)", str(error))
sys.exit(9)

def run(self):
"""Start the daemons loop. Blocks until the daemon stops."""
loop = GLib.MainLoop()
"""Start the event loop and publish the daemon.
Blocks until the daemon stops."""
self._loop = loop = asyncio.get_event_loop()

async def task():
self._bus = bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
bus.export(path=PATH_NAME, interface=self)
if RequestNameReply.PRIMARY_OWNER != await bus.request_name(BUS_NAME):
logger.error("Is the service already running?")
sys.exit(9)

loop.run_until_complete(task())
logger.debug("Running daemon")
loop.run()
loop.run_forever()

def quit(self, *_):
self.stop_all()
self._bus.unexport(path=PATH_NAME, interface=self)

def refresh(self, group_key: Optional[str] = None):
async def stop_later():
# give all injections time to reset uinputs
await asyncio.sleep(0.2)

# loop.run_forever will return
self._loop.stop()

asyncio.ensure_future(stop_later())

async def refresh(self, group_key: Optional[str] = None):
"""Refresh groups if the specified group is unknown.

Parameters
Expand All @@ -293,18 +292,19 @@ def refresh(self, group_key: Optional[str] = None):
logger.debug("Refreshing because last info is too old")
# it may take a little bit of time until devices are visible after
# changes
time.sleep(0.1)
await asyncio.sleep(0.1)
groups.refresh()
self.refreshed_devices_at = now
return

if not groups.find(key=group_key):
logger.debug('Refreshing because "%s" is unknown', group_key)
time.sleep(0.1)
await asyncio.sleep(0.1)
groups.refresh()
self.refreshed_devices_at = now

def stop_injecting(self, group_key: str):
@method()
def stop_injecting(self, group_key: "s"):
"""Stop injecting the preset mappings for a single device."""
if self.injectors.get(group_key) is None:
logger.debug(
Expand All @@ -316,13 +316,15 @@ def stop_injecting(self, group_key: str):
self.injectors[group_key].stop_injecting()
self.autoload_history.forget(group_key)

def get_state(self, group_key: str) -> InjectorState:
@method()
def get_state(self, group_key: "s") -> "s":
"""Get the injectors state."""
injector = self.injectors.get(group_key)
return injector.get_state() if injector else InjectorState.UNKNOWN
# if there is no injector it surely is stopped
return injector.get_state() if injector else InjectorState.STOPPED

@remove_timeout
def set_config_dir(self, config_dir: str):
@method()
def set_config_dir(self, config_dir: "s"):
"""All future operations will use this config dir.

Existing injections (possibly of the previous user) will be kept
Expand All @@ -342,15 +344,15 @@ def set_config_dir(self, config_dir: str):
self.config_dir = config_dir
global_config.load_config(config_path)

def _autoload(self, group_key: str):
async def _autoload(self, group_key: str):
"""Check if autoloading is a good idea, and if so do it.

Parameters
----------
group_key
unique identifier used by the groups object
"""
self.refresh(group_key)
await self.refresh(group_key)

group = groups.find(key=group_key)
if group is None:
Expand Down Expand Up @@ -379,11 +381,11 @@ def _autoload(self, group_key: str):
)
return

self.start_injecting(group.key, preset)
await self.start_injecting(group.key, preset)
self.autoload_history.remember(group.key, preset)

@remove_timeout
def autoload_single(self, group_key: str):
@method()
async def autoload_single(self, group_key: "s"):
"""Inject the configured autoload preset for the device.

If the preset is already being injected, it won't autoload it again.
Expand All @@ -407,10 +409,10 @@ def autoload_single(self, group_key: str):
)
return

self._autoload(group_key)
await self._autoload(group_key)

@remove_timeout
def autoload(self):
@method()
async def autoload(self):
"""Load all autoloaded presets for the current config_dir.

If the preset is already being injected, it won't autoload it again.
Expand All @@ -431,9 +433,10 @@ def autoload(self):
return

for group_key, _ in autoload_presets:
self._autoload(group_key)
await self._autoload(group_key)

def start_injecting(self, group_key: str, preset: str) -> bool:
@method()
async def start_injecting(self, group_key: "s", preset: "s") -> "b":
"""Start injecting the preset for the device.

Returns True on success. If an injection is already ongoing for
Expand All @@ -448,7 +451,7 @@ def start_injecting(self, group_key: str, preset: str) -> bool:
"""
logger.info('Request to start injecting for "%s"', group_key)

self.refresh(group_key)
await self.refresh(group_key)

if self.config_dir is None:
logger.error(
Expand Down Expand Up @@ -514,7 +517,7 @@ def start_injecting(self, group_key: str, preset: str) -> bool:

try:
injector = Injector(group, preset)
injector.start()
await injector.start_injecting()
self.injectors[group.key] = injector
except OSError:
# I think this will never happen, probably leftover from
Expand All @@ -523,13 +526,15 @@ def start_injecting(self, group_key: str, preset: str) -> bool:

return True

@method()
def stop_all(self):
"""Stop all injections."""
logger.info("Stopping all injections")
for group_key in list(self.injectors.keys()):
self.stop_injecting(group_key)

def hello(self, out: str):
@method()
def hello(self, out: "s") -> "s":
"""Used for tests."""
logger.info('Received "%s" from client', out)
return out
1 change: 1 addition & 0 deletions inputremapper/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ def refresh(self):
result is cached. Use refresh_groups if you need up to date
devices.
"""
# todo: make sure this is non blocking or can be awaited
pipe = multiprocessing.Pipe()
_FindGroups(pipe[1]).start()
# block until groups are available
Expand Down
3 changes: 1 addition & 2 deletions inputremapper/gui/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
Any,
)

import gi
from evdev.ecodes import EV_KEY, EV_REL, EV_ABS

from gi.repository import Gtk
Expand Down Expand Up @@ -490,7 +489,7 @@ def start_key_recording(self):
Updates the active_mapping.input_combination with the recorded events.
"""
state = self.data_manager.get_state()
if state == InjectorState.RUNNING or state == InjectorState.STARTING:
if state == InjectorState.RUNNING:
self.data_manager.stop_combination_recording()
self.message_broker.signal(MessageType.recording_finished)
self.show_status(CTX_ERROR, _('Use "Stop" to stop before editing'))
Expand Down
6 changes: 5 additions & 1 deletion inputremapper/injection/global_uinputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ def __init__(self, *args, **kwargs):
logger.debug('creating UInput device: "%s"', name)
super().__init__(*args, **kwargs)

# this will never change, so we cache it since evdev runs an expensive loop to
# gather the capabilities. (can_emit is called regularly)
self._capabilities_cache = self.capabilities(absinfo=False)

def can_emit(self, event: Tuple[int, int, int]):
"""Check if an event can be emitted by the UIinput.

Wrong events might be injected if the group mappings are wrong,
"""
return event[1] in self.capabilities(absinfo=False).get(event[0], [])
return event[1] in self._capabilities_cache.get(event[0], [])


class FrontendUInput:
Expand Down
Loading