Skip to content
Advertisement

High memory allocation when using dask.bag.map

I am using dask for extending dask bag items by information from an external, previously computed object arg. Dask seems to allocate memory for arg for each partition at once in the beginning of the computation process.

Is there a workaround to prevent Dask from duplicating the arg multiple times (and allocating a lot of memory)?

Here is a simplified example:

from pathlib import Path

import numpy as np
import pandas as pd
from dask import bag

in_dir = Path.home() / 'in_dir'
out_dir = Path.home() / 'out_dir'

in_dir.mkdir(parents=True, exist_ok=True)
out_dir.mkdir(parents=True, exist_ok=True)

n_files = 100
n_lines_per_file = int(1e6)

df = pd.DataFrame({
    'a': np.arange(n_lines_per_file).astype(str)
})

for i in range(n_files):
    df.to_csv(in_dir / f'{i}.txt', index=False, header=False)


def mapper(x, arg):
    y = x  # map x to y using arg
    return y


arg = np.zeros(int(1e7))

(
    bag
    .read_text(str(in_dir / '*.txt'))
    .map((lambda x, y: x), arg)
    .to_textfiles(str(out_dir / '*.txt'))
)

Advertisement

Answer

One strategy for dealing with this is to scatter your data to workers first:

import dask.bag, dask.distributed

client = dask.distributed.Client()

arg = np.zeros(int(1e7))
arg_f = client.scatter(arg, broadcast=True)

(
    dask.bag
    .read_text(str(in_dir / '*.txt'))
    .map((lambda x, y: x), arg_f)
    .to_textfiles(str(out_dir / '*.txt'))
)

This sends a copy of the data to each worker, but does not create a copy for each task.

Advertisement