Skip to content
Advertisement

How to async handle callback from keyboard hotkeys?

Need to register global hotkeys. For example, f4 and f8. With keyboard library while first callback didn’t return, the next won’t call.

Another words, logs like this

pressed f4
end for f4
pressed f8
end for f8

But I want to like this

pressed f4
pressed f8
end for f4
end for f8

Demo code

# pip install keyboard
from keyboard import add_hotkey, wait
from time import sleep

def on_callback(key):
    print('pressed', key)
    sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: on_callback("f4"))
add_hotkey("f8", lambda: on_callback("f8"))

wait('esc')

I tried to use asyncio, but nothing changed

pressed f4
end for f4
pressed f8
end for f8
from keyboard import add_hotkey, wait
import asyncio

async def on_callback(key):
    print('pressed', key)
    await asyncio.sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: asyncio.run(on_callback("f4")))
add_hotkey("f8", lambda: asyncio.run(on_callback("f8")))

wait('esc')

Update 1

Keyboard library’s developer gave advise to use call_later function that create new thread for each callback and it’s works like I want.

But is there way to do such task in the same thread (use asyncio)? I didn’t succeed.

# example with 'call_later' function
from keyboard import add_hotkey, wait, call_later
from time import sleep

def on_callback(key):
    print('pressed', key)
    sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: call_later(on_callback, args=("f4",)))
add_hotkey("f8", lambda: call_later(on_callback, args=("f8",)))

wait('esc')

Update 2

Now it’s look like below (full code on github). I seems that creating new threads in order to wait http request is too heavy operation. Thus I want to use asyncio in current thread and the same time continue handle other hotkeys.

from googleapiclient.discovery import build
from os import getenv
from settings import get_settings
from loguru import logger
import keyboard

class ScriptService():

    def __init__(self):
        # ...
        self._script = AppsScript(id)
        self._hotkeys = values["hotkeys"]

    def _register_hotkeys(self):
        self._add_hotkey(self._hotkeys["reload"], self._on_reload)
        for item in self._hotkeys["goofy"]:
            k, f = item["keys"], item["function"]
            self._add_hotkey(k, self._on_callback, args=(f, k))

    def _add_hotkey(self, keys, callback, args=()):
        # lambda bug: https://github.com/boppreh/keyboard/issues/493
        keyboard.add_hotkey(keys, lambda: keyboard.call_later(callback, args))

    def _on_callback(self, function, keys):
        response = self._script.run(function)

class AppsScript():

    def __init__(self, id: str):
        self._name = getenv("API_SERVICE_NAME")
        self._version = getenv("API_VERSION")
        self._id = id

    def run(self, function: str):
        body = {"function": function}
        with build(self._name, self._version, credentials=get_credentials()) as service:
            # http request
            return service.scripts().run(scriptId=self._id, body=body).execute()

Advertisement

Answer

Unfortunately none of the libraries you are using are actually awaitable, so using them with asyncio is going to be a challenge. You could extract the actual http calls from the google library and then implement your own client later using an asyncio-compatible library, but that’s a lot of work just to avoid the expense of spinning up a new thread.

Fortunately there’s already a way to avoid the expensive of spinning up threads: use a pool of worker threads. In this approach, rather than spinning up a new thread immediately for every callback, we add the task to a queue of tasks serviced by a pool of threads we spin up in advance. That way we pay to spin the thread up only one, and after that we only pay to serialise the request to the thread—which is not nothing, but it’s less than spinning up a thread.

Whilst it’s possible to have asyncio manage the thread pool, in this instance it brings no advantages at all, since nothing else in your code is awaitable. (If you did want to do this, you’d use loop.run_in_exeuctor(), taking care not to re-create the pool as noted in this question.)

Here’s some dummy code, which would need adapting to your classes:

from threading import Thread
from queue import Queue
from time import sleep
from random import randint


def process(task):
    print(task["name"])
    sleep(3 + randint(0, 100) / 100)
    print(f"Task {task['name']} done")


class WorkerThread(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
        print("Started worker thread")
        self._open = True

    def run(self):
        while self._open:
            task = self.queue.get()
            process(task)
            self.queue.task_done()

    def close(self):
        print("Closing", self)
        self._open = False


task_queue = Queue()
THREADS = 6
worker_threads = [WorkerThread(task_queue) for _ in range(THREADS)]
for worker in worker_threads:
    worker.setDaemon(True)
    worker.start()


print("Sending one task")
task_queue.put({"name": "Task 1"})
sleep(1)

print("Sending a bunch of tasks")
for i in range(1, 15):
    task_queue.put({"name": f"Task {i}"})
print("Sleeping for a bit")
sleep(2)

print("Shutting down")

# wrap this in your exit code
task_queue.join()  # wait for everything to be done
for worker in worker_threads:
    worker.close()

There are other approaches, but I think writing it explicitly is clearer here. Note that I have assumed your code is not cpu-bound, so it makes sense to use threads rather than processes.

Incidentally, this looks very like a minimal implementation of something like celery, which is probably overkill for your needs but might we interesting to look at all the same.

BTW, I don’t read russian, but this looks like a fun project.

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement