Skip to content
Advertisement

create threads within processes and share a queue between the threads

I am trying to create 3 threads within each of 2 processes and share a queue of the type multiprocessing.JoinableQueue among all threads. The worker_func function simply creates the threads while the thread_func function prints out the values it gets from the queue. The program gets stuck somewhere in the time.sleep or in the get() method of queue. What am I doing wrong? I am running on a Windows computer.

import threading
from multiprocessing import Pool, Manager, JoinableQueue
import multiprocessing
from threading import Thread
import time


def thread_func(q, disp_lock):
    with disp_lock:
        print('thread ', threading.current_thread().name, ' in process ', multiprocessing.current_process().name ,
              ' reporting for duty')
    while True:
        time.sleep(0.1)
        try:
            val = q.get_nowait()
            with disp_lock:
                print('thread ', threading.current_thread().name, ' in process ', multiprocessing.current_process().name , ' got value: ',val)
            q.task_done()
        except:
            with disp_lock:
                print('queue is empty: ', q.qsize())

def worker_func(num_threads, q, disp_lock):

    threads = []
    for i in range(num_threads):
        thread = Thread(target= thread_func, args=( q, disp_lock,))
        thread.daemon = True
        thread.start()

if __name__ == "__main__":

    manager = Manager()
    lock    = manager.Lock()

    q1 = JoinableQueue()#manager.Queue()
    q1_length = 20

    for i in range(q1_length):
        q1.put(i)
        
    processes = []
    num_processes = 2   # 2 processes
    num_threads = 3
    for _ in range(num_processes):
        p = multiprocessing.Process(target=worker_func, args=( num_threads, q1, lock, )) # create a new Process
        p.daemon = True
        p.start()
        processes.append(p)

    q1.join()

Advertisement

Answer

You are not allowing the threads to complete their work. Either set them as non-daemon, or explicitly wait for them to join:

def worker_func(num_threads, q, disp_lock):
    threads = []
    for i in range(num_threads):
        thread = Thread(target=thread_func, args=(q, disp_lock,))
        thread.daemon = True
        thread.start()
        
        threads.append(thread)

    # Wait for them to finish
    for thread in threads:
        thread.join()
Advertisement