I am using dask for extending dask bag items by information from an external, previously computed object arg
. Dask seems to allocate memory for arg
for each partition at once in the beginning of the computation process.
Is there a workaround to prevent Dask from duplicating the arg
multiple times (and allocating a lot of memory)?
Here is a simplified example:
from pathlib import Path import numpy as np import pandas as pd from dask import bag in_dir = Path.home() / 'in_dir' out_dir = Path.home() / 'out_dir' in_dir.mkdir(parents=True, exist_ok=True) out_dir.mkdir(parents=True, exist_ok=True) n_files = 100 n_lines_per_file = int(1e6) df = pd.DataFrame({ 'a': np.arange(n_lines_per_file).astype(str) }) for i in range(n_files): df.to_csv(in_dir / f'{i}.txt', index=False, header=False) def mapper(x, arg): y = x # map x to y using arg return y arg = np.zeros(int(1e7)) ( bag .read_text(str(in_dir / '*.txt')) .map((lambda x, y: x), arg) .to_textfiles(str(out_dir / '*.txt')) )
Advertisement
Answer
One strategy for dealing with this is to scatter
your data to workers first:
import dask.bag, dask.distributed client = dask.distributed.Client() arg = np.zeros(int(1e7)) arg_f = client.scatter(arg, broadcast=True) ( dask.bag .read_text(str(in_dir / '*.txt')) .map((lambda x, y: x), arg_f) .to_textfiles(str(out_dir / '*.txt')) )
This sends a copy of the data to each worker, but does not create a copy for each task.