Does anyone explain the python multiprocessing queue communication in detail? What’s happening when the parameter been put into the queue? I have a snippet of code, which is confusing me.
import time import numpy as np from multiprocessing import Queue, Process def task(queue_in, queue_out): mutural_np = np.zeros((10, 2)) while True: msg = queue_in.get() res = [] i = 0 for i in range(msg): newnp = np.ones((1, 2)) * (msg - i) mutural_np[i:i+1] = newnp res = mutural_np[:i] print("===> put: ", res) queue_out.put(res) if __name__ == "__main__": queue_in = Queue(10) queue_out = Queue(1) p1 = Process(target=task, args=(queue_in, queue_out)) p1.start() for i in range(5): queue_in.put(i + 1) while True: msg = queue_out.get() time.sleep(0.5) print("***> out: ", msg)
and the output is:
===> put: [] ===> put: [[2. 2.]] ===> put: [[3. 3.] [2. 2.]] ***> out: [] ===> put: [[4. 4.] [3. 3.] [2. 2.]] ***> out: [[3. 3.]] ===> put: [[5. 5.] [4. 4.] [3. 3.] [2. 2.]] ***> out: [[4. 4.] [3. 3.]] ***> out: [[5. 5.] [4. 4.] [3. 3.]] ***> out: [[5. 5.] [4. 4.] [3. 3.] [2. 2.]]
why do I have this inconsistency?
I see the doc saying
” When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. “
“The pickle module keeps track of the objects it has already serialized so that later references to the same object won’t be serialized again”
From my understanding, the object is pickled and immutable when I put it in the queue, but it seems the pickling happens after it been flush out.
Advertisement
Answer
I think I got the answer by reading the source code.
When I put the object into the process queue, Python starts a thread to serialize and send the data. So, to be safe, do a copy before putting it in the queue.