Need to register global hotkeys. For example, f4
and f8
. With keyboard library while first callback didn’t return, the next won’t call.
Another words, logs like this
pressed f4 end for f4 pressed f8 end for f8
But I want to like this
pressed f4 pressed f8 end for f4 end for f8
Demo code
# pip install keyboard from keyboard import add_hotkey, wait from time import sleep def on_callback(key): print('pressed', key) sleep(5) # emulate long run task print('end for', key) add_hotkey("f4", lambda: on_callback("f4")) add_hotkey("f8", lambda: on_callback("f8")) wait('esc')
I tried to use asyncio
, but nothing changed
pressed f4 end for f4 pressed f8 end for f8
from keyboard import add_hotkey, wait import asyncio async def on_callback(key): print('pressed', key) await asyncio.sleep(5) # emulate long run task print('end for', key) add_hotkey("f4", lambda: asyncio.run(on_callback("f4"))) add_hotkey("f8", lambda: asyncio.run(on_callback("f8"))) wait('esc')
Update 1
Keyboard library’s developer gave advise to use call_later
function that create new thread for each callback
and it’s works like I want.
But is there way to do such task in the same thread (use asyncio
)? I didn’t succeed.
# example with 'call_later' function from keyboard import add_hotkey, wait, call_later from time import sleep def on_callback(key): print('pressed', key) sleep(5) # emulate long run task print('end for', key) add_hotkey("f4", lambda: call_later(on_callback, args=("f4",))) add_hotkey("f8", lambda: call_later(on_callback, args=("f8",))) wait('esc')
Update 2
Now it’s look like below (full code on github). I seems that creating new threads in order to wait http request is too heavy operation. Thus I want to use asyncio in current thread and the same time continue handle other hotkeys.
from googleapiclient.discovery import build from os import getenv from settings import get_settings from loguru import logger import keyboard class ScriptService(): def __init__(self): # ... self._script = AppsScript(id) self._hotkeys = values["hotkeys"] def _register_hotkeys(self): self._add_hotkey(self._hotkeys["reload"], self._on_reload) for item in self._hotkeys["goofy"]: k, f = item["keys"], item["function"] self._add_hotkey(k, self._on_callback, args=(f, k)) def _add_hotkey(self, keys, callback, args=()): # lambda bug: https://github.com/boppreh/keyboard/issues/493 keyboard.add_hotkey(keys, lambda: keyboard.call_later(callback, args)) def _on_callback(self, function, keys): response = self._script.run(function) class AppsScript(): def __init__(self, id: str): self._name = getenv("API_SERVICE_NAME") self._version = getenv("API_VERSION") self._id = id def run(self, function: str): body = {"function": function} with build(self._name, self._version, credentials=get_credentials()) as service: # http request return service.scripts().run(scriptId=self._id, body=body).execute()
Advertisement
Answer
Unfortunately none of the libraries you are using are actually awaitable, so using them with asyncio is going to be a challenge. You could extract the actual http calls from the google library and then implement your own client later using an asyncio-compatible library, but that’s a lot of work just to avoid the expense of spinning up a new thread.
Fortunately there’s already a way to avoid the expensive of spinning up threads: use a pool of worker threads. In this approach, rather than spinning up a new thread immediately for every callback, we add the task to a queue of tasks serviced by a pool of threads we spin up in advance. That way we pay to spin the thread up only one, and after that we only pay to serialise the request to the thread—which is not nothing, but it’s less than spinning up a thread.
Whilst it’s possible to have asyncio manage the thread pool, in this instance it brings no advantages at all, since nothing else in your code is awaitable. (If you did want to do this, you’d use loop.run_in_exeuctor()
, taking care not to re-create the pool as noted in this question.)
Here’s some dummy code, which would need adapting to your classes:
from threading import Thread from queue import Queue from time import sleep from random import randint def process(task): print(task["name"]) sleep(3 + randint(0, 100) / 100) print(f"Task {task['name']} done") class WorkerThread(Thread): def __init__(self, queue): super().__init__() self.queue = queue print("Started worker thread") self._open = True def run(self): while self._open: task = self.queue.get() process(task) self.queue.task_done() def close(self): print("Closing", self) self._open = False task_queue = Queue() THREADS = 6 worker_threads = [WorkerThread(task_queue) for _ in range(THREADS)] for worker in worker_threads: worker.setDaemon(True) worker.start() print("Sending one task") task_queue.put({"name": "Task 1"}) sleep(1) print("Sending a bunch of tasks") for i in range(1, 15): task_queue.put({"name": f"Task {i}"}) print("Sleeping for a bit") sleep(2) print("Shutting down") # wrap this in your exit code task_queue.join() # wait for everything to be done for worker in worker_threads: worker.close()
There are other approaches, but I think writing it explicitly is clearer here. Note that I have assumed your code is not cpu-bound, so it makes sense to use threads rather than processes.
Incidentally, this looks very like a minimal implementation of something like celery
, which is probably overkill for your needs but might we interesting to look at all the same.
BTW, I don’t read russian, but this looks like a fun project.