Trying to understand concurrency based on generators, I try to follow the talk by D. Beazley. I don’t understand the purpose of future_monitor
and was wondering what are the consequence of taking the function out? Here is its implementation of asynchronous server and, right after, my implementation without the future_monitor
function.
Perhaps I misunderstand how future and add_done_callback
act with concurrent library.
I do not know:
- what is running in the main process and what is delegated to an other process.
- How
add_done_callback
interfere with the main process, will it appear in the middle of the main thread whenever the future is done?
As I understand:
- the function submitted to the pool is executed in other process that the main process,
- and the return of other process is registered in future object when returning (I imagine a kind of message queue between two different process where future will get the result passes by the tierce process executing
fib(n)
) add_done_callback
is a not blocking function that will suspend the main thread when future is done and call callback immediately (suspending the main process?)
# server.py # Fib microservice def fib(n): if n <= 2: return 1 else: return fib(n-1) + fib(n-2) from socket import * from collections import deque from select import select from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import ProcessPoolExecutor as Pool import os import psutil def future_done(future): import pdb;pdb.set_trace() tasks.append(future_wait.pop(future)) future_notify.send(b'x') def future_monitor(): while True: yield 'recv', future_event future_event.recv(100) def run(): while any([tasks, recv_wait, send_wait]): while not tasks: # No active tasks to run # wait for I/O can_recv, can_send, _ = select(recv_wait, send_wait, []) for s in can_recv: tasks.append(recv_wait.pop(s)) for s in can_send: tasks.append(send_wait.pop(s)) # Tasks left in tasks_queue task = tasks.popleft() try: why, what = next(task) # Run to the yield if why == 'recv': # Must go wait somewhere recv_wait[what] = task elif why == 'send': send_wait[what] = task elif why == 'future': future_wait[what] = task import pdb;pdb.set_trace() what.add_done_callback(future_done) else: raise RuntimeError("ARG!") except StopIteration: print("task done") def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: yield 'recv', sock client, addr = sock.accept() # blocking print("Connection", addr,client) tasks.append(fib_handler(client)) def fib_handler(client): while True: yield 'recv', client req = client.recv(100)# blocking print(f'Receive {req} from Client {client.getpeername()} ') if not req: break n = int(req) future = pool.submit(fib, n) yield 'future', future result = future.result() # Blocks resp = str(result).encode('ascii') + b'n' yield 'send',client client.send(resp) # blocking print(f'Send {resp} to Client {client.getpeername()} ') print("Closed") if __name__=='__main__': #Parameters pool = Pool(4) recv_wait = { } # Mapping sockets -> tasks (generators) send_wait = { } future_wait = { } future_notify, future_event = socketpair() tasks = deque() #Main tasks tasks.append(future_monitor()) tasks.append(fib_server(('',25000))) #import pdb;pdb.set_trace() #Run Event loop run()
# server.py # Fib microservice from socket import * from fib import fib from collections import deque from select import select from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import ProcessPoolExecutor as Pool def future_done(future): import pdb;pdb.set_trace() tasks.append(future_wait.pop(future)) def run(): while any([tasks, recv_wait, send_wait]): while not tasks: # No active tasks to run # wait for I/O can_recv, can_send, _ = select(recv_wait, send_wait, []) for s in can_recv: tasks.append(recv_wait.pop(s)) for s in can_send: tasks.append(send_wait.pop(s)) # Tasks left in tasks_queue task = tasks.popleft() try: why, what = next(task) # Run to the yield if why == 'recv': # Must go wait somewhere recv_wait[what] = task elif why == 'send': send_wait[what] = task elif why == 'future': future_wait[what] = task import pdb;pdb.set_trace() what.add_done_callback(future_done) else: raise RuntimeError("ARG!") except StopIteration: print("task done") def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: yield 'recv', sock client, addr = sock.accept() # blocking print("Connection", addr,client) tasks.append(fib_handler(client)) def fib_handler(client): while True: yield 'recv', client req = client.recv(100)# blocking print(f'Receive {req} from Client {client.getpeername()} ') if not req: break n = int(req) future = pool.submit(fib, n) yield 'future', future result = future.result() # Blocks resp = str(result).encode('ascii') + b'n' yield 'send',client client.send(resp) # blocking print(f'Send {resp} to Client {client.getpeername()} ') print("Closed") if __name__=='__main__': #Parameters pool = Pool(4) recv_wait = { } # Mapping sockets -> tasks (generators) send_wait = { } future_wait = { } tasks = deque() #Main tasks tasks.append(fib_server(('',25000))) #import pdb;pdb.set_trace() #Run Event loop run()
Advertisement
Answer
I have perhaps some elements of the answer .
To be clear we ll speak about 2 process:
- Main process
- Fib process The Main Process has 2 thread:
- main thread
- callback thread (which treat the futur object)
add_done_callback is a not blocking function (run in other concurrent thread but in same process, seed below), so the time that fib(n) execute, the run function is progressing till select statement where run() function will be stuck in the “while not tasks” loop waiting for a polling/select event!
As i understand future.add_done_callback() is just an other thread than the main thread, but stay in the same process (communication is easy)! We have to be careful not mixing everything : some job -> fib(n) is submmitted in other process, but the callback will be called in the main process in an other thread (i suppose as already mentionned that the return of “fib process” is communicated by a kind of “process Queue” to the futur object in the main process).
So even when run() is stuck in the select statement, the callback will be executed concurrently and the “fib_handler” task will be added!
When the callback thread (in main process) return, it only left the main thread, always stuck in the select statement. It is waiting for a recv,send event. So if another client connect it will release the select statement and going out of the “while not tasks” loop (since the futur_done callback added a new task).