I am trying to run my simulations in a threadpool and store my results for each repetition in a global numpy array. However, I get problems while doing that and I am observing a really interesting behavior with the following (, simplified) code (python 3.7):
import numpy as np from multiprocessing import Pool, Lock log_mutex = Lock() repetition_count = 5 data_array = np.zeros(shape=(repetition_count, 3, 200), dtype=float) def record_results(repetition_index, data_array, log_mutex): log_mutex.acquire() print("Start record {}".format(repetition_index)) # Do some stuff and modify data_array, e.g.: data_array[repetition_index, 0, 53] = 12.34 print("Finish record {}".format(repetition_index)) log_mutex.release() def run(repetition_index): global log_mutex global data_array # do some simulation record_results(repetition_index, data_array, log_mutex) if __name__ == "__main__": random.seed() with Pool(thread_count) as p: print(p.map(run, range(repetition_count)))
The issue is: I get the correct “Start record & Finish record” outputs, e.g. Start record 1… Finish record 1. However, the different slices of the numpy array that are modified by each thread is not kept in the global variable. In other words, the elements that have been modified by thread 1 is still zero, a thread 4 overwrites different parts of the array.
One additional remark, the address of the global array, which I retrieve by
print(hex(id(data_array)))
is the same for all threads, inside their log_mutex.acquire() ... log_mutex.release()
lines.
Am I missing a point? Like, there are multiple copies of the global data_array stored for each thread? I am observing some behavior like this but this should not be the case when I use global keyword, am I wrong?
Advertisement
Answer
Looks like you’re running the run
function using multiple processes, not multiple threads. Try something like this instead:
import numpy as np from threading import Thread, Lock log_mutex = Lock() repetition_count = 5 data_array = np.zeros(shape=(repetition_count, 3, 200), dtype=float) def record_results(repetition_index, data_array, log_mutex): log_mutex.acquire() print("Start record {}".format(repetition_index)) # Do some stuff and modify data_array, e.g.: data_array[repetition_index, 0, 53] = 12.34 print("Finish record {}".format(repetition_index)) log_mutex.release() def run(repetition_index): global log_mutex global data_array record_results(repetition_index, data_array, log_mutex) if __name__ == "__main__": threads = [] for i in range(repetition_count): t = Thread(target=run, args=[i]) t.start() threads.append(t) for t in threads: t.join()
Update:
To do this with multiple processes, you would need to use multiprocessing.RawArray
to instantiate your array; the size of the array is the product repetition_count * 3 * 200
. Within each process, create a view on the array using np.frombuffer
, and reshape it accordingly. While this will be very fast, I discourage this style of programming as it relies on global shared memory objects, which are error-prone in larger programs.
If possible, I suggest removing the global data_array
and instead instantiate an array in each call to record_results
, which you would return in run
. The p.map
call will return a list of arrays, which you can convert to a numpy array and recover the shape and contents of the global data_array
in your original implementation. This will incur a communication cost, but it’s a cleaner approach to managing concurrency and eliminates the need for locks.
It’s generally a good idea to minimize inter-process communication, but unless performance is critical, I don’t think shared memory is the right solution. With p.map
, you’ll want to avoid returning large objects, but the object sizes in your snippet are very small (600*8 bytes).