As almost everyone is aware when they first look at threading in Python, there is the GIL that makes life miserable for people who actually want to do processing in parallel – or at least give it a chance.
I am currently looking at implementing something like the Reactor pattern. Effectively I want to listen for incoming socket connections on one thread-like, and when someone tries to connect, accept that connection and pass it along to another thread-like for processing.
I’m not (yet) sure what kind of load I might be facing. I know there is currently setup a 2MB cap on incoming messages. Theoretically we could get thousands per second (though I don’t know if practically we’ve seen anything like that). The amount of time spent processing a message isn’t terribly important, though obviously quicker would be better.
I was looking into the Reactor pattern, and developed a small example using the multiprocessing
library that (at least in testing) seems to work just fine. However, now/soon we’ll have the asyncio library available, which would handle the event loop for me.
Is there anything that could bite me by combining asyncio
and multiprocessing
?
Advertisement
Answer
You should be able to safely combine asyncio
and multiprocessing
without too much trouble, though you shouldn’t be using multiprocessing
directly. The cardinal sin of asyncio
(and any other event-loop based asynchronous framework) is blocking the event loop. If you try to use multiprocessing
directly, any time you block to wait for a child process, you’re going to block the event loop. Obviously, this is bad.
The simplest way to avoid this is to use BaseEventLoop.run_in_executor
to execute a function in a concurrent.futures.ProcessPoolExecutor
. ProcessPoolExecutor
is a process pool implemented using multiprocessing.Process
, but asyncio
has built-in support for executing a function in it without blocking the event loop. Here’s a simple example:
import time import asyncio from concurrent.futures import ProcessPoolExecutor def blocking_func(x): time.sleep(x) # Pretend this is expensive calculations return x * 5 @asyncio.coroutine def main(): #pool = multiprocessing.Pool() #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop. executor = ProcessPoolExecutor() out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not print(out) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main())
For the majority of cases, this is function alone is good enough. If you find yourself needing other constructs from multiprocessing
, like Queue
, Event
, Manager
, etc., there is a third-party library called aioprocessing
(full disclosure: I wrote it), that provides asyncio
-compatible versions of all the multiprocessing
data structures. Here’s an example demoing that:
import time import asyncio import aioprocessing import multiprocessing def func(queue, event, lock, items): with lock: event.set() for item in items: time.sleep(3) queue.put(item+5) queue.close() @asyncio.coroutine def example(queue, event, lock): l = [1,2,3,4,5] p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) p.start() while True: result = yield from queue.coro_get() if result is None: break print("Got result {}".format(result)) yield from p.coro_join() @asyncio.coroutine def example2(queue, event, lock): yield from event.coro_wait() with (yield from lock): yield from queue.coro_put(78) yield from queue.coro_put(None) # Shut down the worker if __name__ == "__main__": loop = asyncio.get_event_loop() queue = aioprocessing.AioQueue() lock = aioprocessing.AioLock() event = aioprocessing.AioEvent() tasks = [ asyncio.async(example(queue, event, lock)), asyncio.async(example2(queue, event, lock)), ] loop.run_until_complete(asyncio.wait(tasks)) loop.close()