Skip to content
Advertisement

Am I parallelizing this right?

I basically have to obtain a certain probability distribution by sampling from a sample of 5000 matrices. So I just need to count how many times element X occurs in the position (i,j) of these 5000 matrices. Then, I save these [values and counts] in a dictionary.

That said, I thought it could be a good idea to parallelize my code, as a serial code would run incredibly slow. The code is the following:

import multiprocessing as mp
import numpy as np


def func(N):
    d = {}

    filenames = ["file" + str(k) + ".txt" for k in range(N, N + 1000)]
    ##each of these files is a 306x306 matrix
    for i in range(306):
        data = np.vstack([np.loadtxt(f, delimiter=",", usecols=i) for f in filenames])
        for j in range(306):
            values, counts = np.unique(data.T[j], return_counts=True)
            for i in values:
                d[i] = counts[np.where(values == i)]
    return d


if __name__ == "__main__":
    N = mp.cpu_count()
    with mp.Pool(processes=N) as p:
        results = p.map(func, [m for m in range(1000, 5000, 1000)])

Since it is my first time parallelizing a function, I would like some feedback. Also, as this is still slow due to the fact that it has to load a 1000×306 matrix, any advice on how to improve it would be very welcome.

Advertisement

Answer

Based on this description:

how many times element X occurs in the position (i,j) of these 5000 matrices

I would re-structure your code to return a 306×306 array of dictionaries which have keys for each value occurring in that position, and values for how many times that value occurs. You can then generate the data for a subset of the files in parallel, and then merge the data at the end. You should adjust the chunksize to load many file at once (as much as you have ram for) to reduce the number of times you have to loop manually over the array indices. Re-ordering the data into “Fortran” order should make array access more efficient (calls to np.unique will be faster) when accessing arr[:,i,j].

arr_shape = (5, 5)
chunksize = 10 #load chunks of 200 files at a time
start_n = 0
end_n = 100

def func(N):
    #unpack args from input tuple
    filenames = ["file" + str(k) + ".txt" for k in range(N[0], N[1])]
    
    #load and stack all the arrays into single array
    arr = np.stack([np.loadtxt(f, delimiter=",") for f in filenames])
    #re-order data in memory for efficient access along 0th axis (not needed, but likely faster)
    arr = np.asfortranarray(arr) 
    
    res = []
    #the more arrays you can load at once (chunksize), the fewer times we have to go through this inefficient loop
    for i in range(arr_shape[0]):
        res.append(list())
        for j in range(arr_shape[1]):
            #each res[i][j] will be a tuple of (values, counts)
            res[i].append(np.unique(arr[:,i,j], return_counts=True))
    return res

if __name__ == "__main__":

    with mp.Pool() as p:
        
        #build tuples of (start, end) for chunks of arbitrary size
        chunks = []
        for start in range(start_n, end_n, chunksize):
            if start + chunksize > end_n:
                end = end_n
            else:
                end = start + chunksize
            chunks.append((start, end))
            
        #build array of dicts to merge results into
        d = []
        for i in range(arr_shape[0]):
            d.append(list())
            for j in range(arr_shape[1]):
                #each d[i][j] will be a dict of d[value] = count
                d[i].append(defaultdict(int)) #empty values default to 0
                
        #call our "func" in parallel, and get any results as they come in.
        for res in p.imap_unordered(func=func, iterable=chunks):
            #merge the results into d
            for i in range(arr_shape[0]):
                for j in range(arr_shape[1]):
                    #recall result is array of tuples of (values, counts). zip() is an easy way to get them in pairs
                    for value, count in zip(res[i][j][0], res[i][j][1]):
                        d[i][j][value] += count
User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement