Skip to content
Advertisement

Can’t understand how D. Beazley monitor future in its code on concurrence

Trying to understand concurrency based on generators, I try to follow the talk by D. Beazley. I don’t understand the purpose of future_monitor and was wondering what are the consequence of taking the function out? Here is its implementation of asynchronous server and, right after, my implementation without the future_monitor function.

Perhaps I misunderstand how future and add_done_callback act with concurrent library.

I do not know:

  1. what is running in the main process and what is delegated to an other process.
  2. How add_done_callback interfere with the main process, will it appear in the middle of the main thread whenever the future is done?

As I understand:

  • the function submitted to the pool is executed in other process that the main process,
  • and the return of other process is registered in future object when returning (I imagine a kind of message queue between two different process where future will get the result passes by the tierce process executing fib(n))
  • add_done_callback is a not blocking function that will suspend the main thread when future is done and call callback immediately (suspending the main process?)
# server.py
# Fib microservice

def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n-1) + fib(n-2)

from socket import *
from collections import deque
from select import select
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import ProcessPoolExecutor as Pool
import os
import psutil

def future_done(future):
    import pdb;pdb.set_trace()
    tasks.append(future_wait.pop(future))
    future_notify.send(b'x')

def future_monitor():
    while True:
        yield 'recv', future_event
        future_event.recv(100)

def run():
    while any([tasks, recv_wait, send_wait]):
        while not tasks:
            # No active tasks to run
            # wait for I/O
            can_recv, can_send, _ = select(recv_wait, send_wait, [])
            for s in can_recv:
                tasks.append(recv_wait.pop(s))
            for s in can_send:
                tasks.append(send_wait.pop(s))
        # Tasks left in tasks_queue  
        task = tasks.popleft()
        try:
            why, what = next(task)   # Run to the yield
            if why == 'recv':
                # Must go wait somewhere
                recv_wait[what] = task
            elif why == 'send':
                send_wait[what] = task
            elif why == 'future':
                future_wait[what] = task
                import pdb;pdb.set_trace()
                what.add_done_callback(future_done)
            else:
                raise RuntimeError("ARG!")
        except StopIteration:
            print("task done")

def fib_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    while True:
        yield 'recv', sock
        client, addr = sock.accept()  # blocking
        print("Connection", addr,client)
        tasks.append(fib_handler(client))

def fib_handler(client):
    while True:
        yield 'recv', client   
        req = client.recv(100)# blocking
        print(f'Receive {req} from Client {client.getpeername()}  ')
        if not req:
            break
        n = int(req)
        future = pool.submit(fib, n)
        yield 'future', future
        result = future.result()    #  Blocks
        resp = str(result).encode('ascii') + b'n'
        yield 'send',client
        client.send(resp)    # blocking
        print(f'Send {resp} to  Client {client.getpeername()}  ')
    print("Closed")

if __name__=='__main__':
    #Parameters
    pool = Pool(4)
    recv_wait = { }   # Mapping sockets -> tasks (generators)
    send_wait = { }
    future_wait = { }
    future_notify, future_event = socketpair()
    tasks = deque()

    #Main tasks
    tasks.append(future_monitor())
    tasks.append(fib_server(('',25000)))
    #import pdb;pdb.set_trace()
    #Run Event loop
    run()
# server.py
# Fib microservice

from socket import *
from fib import fib
from collections import deque
from select import select
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import ProcessPoolExecutor as Pool

def future_done(future):
    import pdb;pdb.set_trace()
    tasks.append(future_wait.pop(future))


def run():
    while any([tasks, recv_wait, send_wait]):
        while not tasks:
            # No active tasks to run
            # wait for I/O
            can_recv, can_send, _ = select(recv_wait, send_wait, [])
            for s in can_recv:
                tasks.append(recv_wait.pop(s))
            for s in can_send:
                tasks.append(send_wait.pop(s))
        # Tasks left in tasks_queue  
        task = tasks.popleft()
        try:
            why, what = next(task)   # Run to the yield
            if why == 'recv':
                # Must go wait somewhere
                recv_wait[what] = task
            elif why == 'send':
                send_wait[what] = task
            elif why == 'future':
                future_wait[what] = task
                import pdb;pdb.set_trace()
                what.add_done_callback(future_done)
            else:
                raise RuntimeError("ARG!")
        except StopIteration:
            print("task done")

def fib_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    while True:
        yield 'recv', sock
        client, addr = sock.accept()  # blocking
        print("Connection", addr,client)
        tasks.append(fib_handler(client))

def fib_handler(client):
    while True:
        yield 'recv', client   
        req = client.recv(100)# blocking
        print(f'Receive {req} from Client {client.getpeername()}  ')
        if not req:
            break
        n = int(req)
        future = pool.submit(fib, n)
        yield 'future', future
        result = future.result()    #  Blocks
        resp = str(result).encode('ascii') + b'n'
        yield 'send',client
        client.send(resp)    # blocking
        print(f'Send {resp} to  Client {client.getpeername()}  ')
    print("Closed")

if __name__=='__main__':
    #Parameters
    pool = Pool(4)
    recv_wait = { }   # Mapping sockets -> tasks (generators)
    send_wait = { }
    future_wait = { }
    tasks = deque()

    #Main tasks
    tasks.append(fib_server(('',25000)))
    #import pdb;pdb.set_trace()
    #Run Event loop
    run()

Advertisement

Answer

I have perhaps some elements of the answer .

To be clear we ll speak about 2 process:

  • Main process
  • Fib process The Main Process has 2 thread:
  • main thread
  • callback thread (which treat the futur object)

add_done_callback is a not blocking function (run in other concurrent thread but in same process, seed below), so the time that fib(n) execute, the run function is progressing till select statement where run() function will be stuck in the “while not tasks” loop waiting for a polling/select event!

As i understand future.add_done_callback() is just an other thread than the main thread, but stay in the same process (communication is easy)! We have to be careful not mixing everything : some job -> fib(n) is submmitted in other process, but the callback will be called in the main process in an other thread (i suppose as already mentionned that the return of “fib process” is communicated by a kind of “process Queue” to the futur object in the main process).

So even when run() is stuck in the select statement, the callback will be executed concurrently and the “fib_handler” task will be added!

When the callback thread (in main process) return, it only left the main thread, always stuck in the select statement. It is waiting for a recv,send event. So if another client connect it will release the select statement and going out of the “while not tasks” loop (since the futur_done callback added a new task).

User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement