I have a function with list valued return values that I’m multiprocessing in Python and I need to concatenate them to 1D lists at the end. The following is a sample code for demonstration:
import numpy as np import multiprocessing as mp import random as rd N = 4 L = list(range(0, N)) def F(x): a = [] b = [] for t in range(0,2): a.append('a'+str(t*x)) b.append('b'+str(t*x)) return a, b pool = mp.Pool(mp.cpu_count()) a,b = zip(*pool.map(F, L)) pool.close() print(a) print(b) A = np.concatenate(a) B = np.concatenate(b) print(A) print(B)
The output for illustration is:
(['a0', 'a0'], ['a0', 'a1'], ['a0', 'a2'], ['a0', 'a3']) (['b0', 'b0'], ['b0', 'b1'], ['b0', 'b2'], ['b0', 'b3']) ['a0' 'a0' 'a0' 'a1' 'a0' 'a2' 'a0' 'a3'] ['b0' 'b0' 'b0' 'b1' 'b0' 'b2' 'b0' 'b3']
The problem is that the list L
that I’m processing is pretty huge and that the concatenations at the end take a huge amount of time which minimizes the advantage over serial processing considerably.
Is there some clever way to avoid the concatenation or alternatively a faster method to perform the concatenation? I’ve been fiddling with queues but this seems kind of very slow.
Note: This seems to be a similar question as Add result from multiprocessing into array.
Advertisement
Answer
If the desired output is an input suitable for creating a scipy.sparse.coo_matrix
, I would take a very different approach: Don’t return anything, just create shared objects that can be modified directly.
What you need to create a coo_matrix
is an array of the data values, an array of the data rows, and an array of the data columns (unless you already have another sparse matrix / dense matrix). I would create 3 shared arrays that each process can dump results directly into using the index of each entry from L
. This even allows out of order execution, so you can use imap_unordered
instead for better speed:
from multiprocessing.pool import Pool from multiprocessing.sharedctypes import RawArray from random import random, randint # bogus data for testing import numpy as np from ctypes import c_int, c_float from scipy.sparse import coo_matrix #pool worker globals are only global to that process worker_globals = {} def init_worker(data_array, row_array, col_array): worker_globals['data'] = np.frombuffer(data_array, dtype=c_float) worker_globals['row'] = np.frombuffer(row_array, dtype=c_int) worker_globals['col'] = np.frombuffer(col_array, dtype=c_int) def worker_func(tup): i, x = tup #enumerate returns a tuple with the index then the value #don't bother with mutexes because we only ever write to array[i] once from a single process worker_globals['data'][i] = random() #calculate your data, row, and column, and write back to the shared arrays worker_globals['row'][i] = x worker_globals['col'][i] = randint(0,1000) if __name__ == "__main__": L = list(range(100, 0, -1)) #some data in L data_array = RawArray(c_float, len(L)) row_array = RawArray(c_int, len(L)) col_array = RawArray(c_int, len(L)) with Pool(initializer=init_worker, initargs=(data_array, row_array, col_array)) as p: for _ in p.imap_unordered(worker_func, enumerate(L)): pass d = np.frombuffer(data_array, dtype=c_float) r = np.frombuffer(row_array, dtype=c_int) c = np.frombuffer(col_array, dtype=c_int) mat = coo_matrix((d, (r, c)), shape=(1000, 1000))
By the way: You should also absolutely always be using if __name__ == "__main__":
when using multiprocessing. It is suggested everywhere, and required on windows.