Skip to content
Advertisement

Dask Distributed: Reducing Multiple Dimensions into a Distance Matrix

I want to calculate a large distance matrix, based on a higher dimensional vector. For instance, I have 1000 instances each represented by 20 vectors of length 10. The distance between each two instances is given by the mean distance between each of the 20 vectors associated to each vector. So I want to go from a 1000 by 20 by 10 matrix to a 1000 by 1000 (lower-triangular) matrix. Because these calculations can get slow, I want to use Dask distributed to block the algorithm and spread it over several CPU’s. Below is how far I’ve gotten:

Preamble

import itertools
import random
import numpy as np
import dask.array
from dask.distributed import Client

The distance function is defined by

def distance(u, v):
    result = np.empty([int((len(u)*(len(u)+1))/2)], dtype=float)
    for i, j in itertools.product(range(len(u)),range(len(v))):
        if j <= i: 
            differences = []
            k = int(((i*(i+1))/2 +j-1)+1)
            for x,y in itertools.product(u[i], v[j]):
                difference = np.abs(np.array(x) - np.array(y)).sum(axis=1)
                differences.apply(difference)
            result[k] = np.mean(differences) 
    return result

and returns an array of length n*(n+1)/2 to describe the lower triangular matrix for this block of the distance matrix.

def distance_matrix(X):
    X = np.asarray(X, dtype=object)
    X = dask.array.from_array(X, (100, 20, 10)).astype(float)
    print("chunksize: ", X.chunksize)
    resulting_length = [int((X.chunksize[0]*(X.chunksize[0])+1)/2)]
    result = dask.array.map_blocks(distance, X, X, chunks=(resulting_length), drop_axis=[1,2], dtype=float)
    return result.compute()

I split up the input array in chunks and use dask.array.map_blocks to apply the distance calculation to all the blocks.

if __name__ == '__main__':
    workers = 6
    X = np.array([[[random.random() for _ in range(10)] for _ in range(20)] for _ in range(1000)])
    client = Client(n_workers=workers)
    results = similarity_matrix(X)
    client.close()
    print(results)

Unfortunately, this approach returns the wrong length of array at the end of the process. Would somebody to help me out here? I don’t have much experience in distributed computing.

Advertisement

Answer

I’m a big fan of dask, but this problem is way too small to need it. The runtime issue you’re seeing is because you are looping through each element in python rather than using vectorized operations in numpy.

As with many packages in python, numpy relies on highly efficient compiled code written in other, faster languages such as C to carry out array operations. When you do something like an array operation A + B numpy calls these fast routines once, and the array operations are carried out within a highly optimized C routine. There is overhead involved with making calls to other languages, but this is overwhelmed by the performance gain due to the single call to a very fast routine. If instead you loop over every element, adding cell-wise, you have a (slow) python process, and on each element, this calls the C code, which adds overhead for each element of the array. Because of this, you actually would be better off not using numpy if you’re going to do this once for each element.

To implement this in a vectorized manner, you can exploit numpy’s broadcasting rules to ensure the first dimensions of your two arrays expand to a new dimension. I don’t totally understand what’s going on in your distance function, but you could extend this simple version to do whatever you want:

In [1]: import numpy as np

In [2]: A = np.random.random((1000, 20))
   ...: B = np.random.random((1000, 20))

In [3]: distance = np.abs(A[:, np.newaxis, :] - B[np.newaxis, :, :]).sum(axis=-1)

In [4]: distance
Out[4]:
array([[7.22985776, 7.76185666, 5.61824886, ..., 7.62092039, 6.35189562,
        7.06365986],
       [5.73359499, 5.8422105 , 7.2644021 , ..., 5.72230353, 6.79390303,
        5.03074007],
       [7.27871151, 8.6856818 , 5.97489449, ..., 8.86620029, 7.49875638,
        6.57389575],
       ...,
       [7.67783107, 7.24419076, 4.17941596, ..., 8.68674754, 6.65078093,
        5.67279811],
       [7.1550136 , 6.10590227, 5.75417987, ..., 7.05953998, 5.8306628 ,
        6.55112672],
       [5.81748615, 6.79246838, 6.95053088, ..., 7.63994705, 6.77720511,
        7.5663236 ]])

In [5]: distance.shape
Out[5]: (1000, 1000)

The performance difference can be seen clearly against a looped implementation:

In [6]: %%timeit
   ...: np.abs(A[:, np.newaxis, :] - B[np.newaxis, :, :]).sum(axis=-1)
   ...:
   ...:
45 ms ± 326 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [7]: %%timeit
   ...: distances = np.empty((1000, 1000))
   ...: for i in range(1000):
   ...:     for j in range(1000):
   ...:         distances[i, j] = np.abs(A[i, :] - B[j, :]).sum()
   ...:
2.42 s ± 7.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

The looped version takes more than 50x as long!

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement