the following code has 5 workers …. each opens its own worker_task()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(worker_task, command_, site_): site_ for site_ in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result()
BUT ….. inside each worker_task() …… I cannot identify … which of the 5 workers is currently being used (Worker_ID)
If I want to print('worker 3 has finished')
inside worker_task() ….. I cannot do this because executor.submit
does not allow
Any solutions?
Advertisement
Answer
You can get name of worker thread with the help of threading.current_thread()
function. Please find some example below:
from concurrent.futures import ThreadPoolExecutor, Future from threading import current_thread from time import sleep from random import randint # imagine these are urls URLS = [i for i in range(100)] def do_some_work(url, a, b): """Simulates some work""" sleep(2) rand_num = randint(a, b) if rand_num == 5: raise ValueError("No! 5 found!") r = f"{current_thread().getName()}||: {url}_{rand_num}n" return r def show_fut_results(fut: Future): """Callback for future shows results or shows error""" if not fut.exception(): print(fut.result()) else: print(f"{current_thread().getName()}| Error: {fut.exception()}n") if __name__ == '__main__': with ThreadPoolExecutor(max_workers=10) as pool: for i in URLS: _fut = pool.submit(do_some_work, i, 1, 10) _fut.add_done_callback(show_fut_results)
If you want more control over threads, use threading
module:
from threading import Thread from queue import Queue from time import sleep from random import randint import logging # imagine these are urls URLS = [f"URL-{i}" for i in range(100)] # number of worker threads WORKER_NUM = 10 def do_some_work(url: str, a: int, b: int) -> str: """Simulates some work""" sleep(2) rand_num = randint(a, b) if rand_num == 5: raise ValueError(f"No! 5 found in URL: {url}") r = f"{url} = {rand_num}" return r def thread_worker_func(q: Queue, a: int, b: int) -> None: """Target function for Worker threads""" logging.info("Started working") while True: try: url = q.get() # if poison pill - stop worker thread if url is None: break r = do_some_work(url, a, b) logging.info(f"Result: {r}") except ValueError as ex: logging.error(ex) except Exception as ex: logging.error(f"Unexpected error: {ex}") logging.info("Finished working") if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format="%(levelname)s | %(threadName)s | %(asctime)s | %(message)s", ) in_q = Queue(50) workers = [ Thread(target=thread_worker_func, args=(in_q, 1, 10, ), name=f"MyWorkerThread-{i+1}") for i in range(WORKER_NUM) ] [w.start() for w in workers] # start distributing tasks for _url in URLS: in_q.put(_url) # send poison pills to worker-threads for w in workers: in_q.put(None) # wait worker thread to join Main Thread logging.info("Main Thread waiting for Worker Threads") [w.join() for w in workers] logging.info("Workers joined") sleep(10) logging.info("App finished")