I basically have to obtain a certain probability distribution by sampling from a sample of 5000 matrices. So I just need to count how many times element X occurs in the position (i,j) of these 5000 matrices. Then, I save these [values and counts] in a dictionary.
That said, I thought it could be a good idea to parallelize my code, as a serial code would run incredibly slow. The code is the following:
import multiprocessing as mp import numpy as np def func(N): d = {} filenames = ["file" + str(k) + ".txt" for k in range(N, N + 1000)] ##each of these files is a 306x306 matrix for i in range(306): data = np.vstack([np.loadtxt(f, delimiter=",", usecols=i) for f in filenames]) for j in range(306): values, counts = np.unique(data.T[j], return_counts=True) for i in values: d[i] = counts[np.where(values == i)] return d if __name__ == "__main__": N = mp.cpu_count() with mp.Pool(processes=N) as p: results = p.map(func, [m for m in range(1000, 5000, 1000)])
Since it is my first time parallelizing a function, I would like some feedback. Also, as this is still slow due to the fact that it has to load a 1000×306 matrix, any advice on how to improve it would be very welcome.
Advertisement
Answer
Based on this description:
how many times element X occurs in the position (i,j) of these 5000 matrices
I would re-structure your code to return a 306×306 array of dictionaries which have keys for each value occurring in that position, and values for how many times that value occurs. You can then generate the data for a subset of the files in parallel, and then merge the data at the end. You should adjust the chunksize
to load many file at once (as much as you have ram for) to reduce the number of times you have to loop manually over the array indices. Re-ordering the data into “Fortran” order should make array access more efficient (calls to np.unique
will be faster) when accessing arr[:,i,j]
.
arr_shape = (5, 5) chunksize = 10 #load chunks of 200 files at a time start_n = 0 end_n = 100 def func(N): #unpack args from input tuple filenames = ["file" + str(k) + ".txt" for k in range(N[0], N[1])] #load and stack all the arrays into single array arr = np.stack([np.loadtxt(f, delimiter=",") for f in filenames]) #re-order data in memory for efficient access along 0th axis (not needed, but likely faster) arr = np.asfortranarray(arr) res = [] #the more arrays you can load at once (chunksize), the fewer times we have to go through this inefficient loop for i in range(arr_shape[0]): res.append(list()) for j in range(arr_shape[1]): #each res[i][j] will be a tuple of (values, counts) res[i].append(np.unique(arr[:,i,j], return_counts=True)) return res if __name__ == "__main__": with mp.Pool() as p: #build tuples of (start, end) for chunks of arbitrary size chunks = [] for start in range(start_n, end_n, chunksize): if start + chunksize > end_n: end = end_n else: end = start + chunksize chunks.append((start, end)) #build array of dicts to merge results into d = [] for i in range(arr_shape[0]): d.append(list()) for j in range(arr_shape[1]): #each d[i][j] will be a dict of d[value] = count d[i].append(defaultdict(int)) #empty values default to 0 #call our "func" in parallel, and get any results as they come in. for res in p.imap_unordered(func=func, iterable=chunks): #merge the results into d for i in range(arr_shape[0]): for j in range(arr_shape[1]): #recall result is array of tuples of (values, counts). zip() is an easy way to get them in pairs for value, count in zip(res[i][j][0], res[i][j][1]): d[i][j][value] += count