Skip to content
Advertisement

Identify current thread in concurrent.futures.ThreadPoolExecutor

the following code has 5 workers …. each opens its own worker_task()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(worker_task, command_, site_): site_ for site_ in URLS}

    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try: data = future.result()

BUT ….. inside each worker_task() …… I cannot identify … which of the 5 workers is currently being used (Worker_ID)

If I want to print('worker 3 has finished') inside worker_task() ….. I cannot do this because executor.submit does not allow

Any solutions?

Advertisement

Answer

You can get name of worker thread with the help of threading.current_thread() function. Please find some example below:

from concurrent.futures import ThreadPoolExecutor, Future
from threading import current_thread
from time import sleep
from random import randint

# imagine these are urls
URLS = [i for i in range(100)]


def do_some_work(url, a, b):
    """Simulates some work"""
    sleep(2)
    rand_num = randint(a, b)
    if rand_num == 5:
        raise ValueError("No! 5 found!")
    r = f"{current_thread().getName()}||: {url}_{rand_num}n"
    return r


def show_fut_results(fut: Future):
    """Callback for future shows results or shows error"""
    if not fut.exception():
        print(fut.result())
    else:
        print(f"{current_thread().getName()}|  Error: {fut.exception()}n")


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=10) as pool:
        for i in URLS:
            _fut = pool.submit(do_some_work, i, 1, 10)
            _fut.add_done_callback(show_fut_results)

If you want more control over threads, use threading module:

from threading import Thread
from queue import Queue
from time import sleep
from random import randint
import logging

# imagine these are urls
URLS = [f"URL-{i}" for i in range(100)]

# number of worker threads
WORKER_NUM = 10


def do_some_work(url: str, a: int, b: int) -> str:
    """Simulates some work"""
    sleep(2)
    rand_num = randint(a, b)
    if rand_num == 5:
        raise ValueError(f"No! 5 found in URL: {url}")
    r = f"{url} = {rand_num}"
    return r


def thread_worker_func(q: Queue, a: int, b: int) -> None:
    """Target function for Worker threads"""
    logging.info("Started working")
    while True:
        try:
            url = q.get()

            # if poison pill - stop worker thread
            if url is None:
                break

            r = do_some_work(url, a, b)
            logging.info(f"Result: {r}")
        except ValueError as ex:
            logging.error(ex)
        except Exception as ex:
            logging.error(f"Unexpected error: {ex}")

    logging.info("Finished working")


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.INFO,
        format="%(levelname)s | %(threadName)s | %(asctime)s | %(message)s",
    )
    in_q = Queue(50)
    workers = [
        Thread(target=thread_worker_func, args=(in_q, 1, 10, ), name=f"MyWorkerThread-{i+1}")
        for i in range(WORKER_NUM)
    ]
    [w.start() for w in workers]

    # start distributing tasks
    for _url in URLS:
        in_q.put(_url)

    # send poison pills to worker-threads
    for w in workers:
        in_q.put(None)

    # wait worker thread to join Main Thread
    logging.info("Main Thread waiting for Worker Threads")
    [w.join() for w in workers]

    logging.info("Workers joined")
    sleep(10)
    logging.info("App finished")
User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement