I am using dask for extending dask bag items by information from an external, previously computed object arg
. Dask seems to allocate memory for arg
for each partition at once in the beginning of the computation process.
Is there a workaround to prevent Dask from duplicating the arg
multiple times (and allocating a lot of memory)?
Here is a simplified example:
JavaScript
x
37
37
1
from pathlib import Path
2
3
import numpy as np
4
import pandas as pd
5
from dask import bag
6
7
in_dir = Path.home() / 'in_dir'
8
out_dir = Path.home() / 'out_dir'
9
10
in_dir.mkdir(parents=True, exist_ok=True)
11
out_dir.mkdir(parents=True, exist_ok=True)
12
13
n_files = 100
14
n_lines_per_file = int(1e6)
15
16
df = pd.DataFrame({
17
'a': np.arange(n_lines_per_file).astype(str)
18
})
19
20
for i in range(n_files):
21
df.to_csv(in_dir / f'{i}.txt', index=False, header=False)
22
23
24
def mapper(x, arg):
25
y = x # map x to y using arg
26
return y
27
28
29
arg = np.zeros(int(1e7))
30
31
(
32
bag
33
.read_text(str(in_dir / '*.txt'))
34
.map((lambda x, y: x), arg)
35
.to_textfiles(str(out_dir / '*.txt'))
36
)
37
Advertisement
Answer
One strategy for dealing with this is to scatter
your data to workers first:
JavaScript
1
14
14
1
import dask.bag, dask.distributed
2
3
client = dask.distributed.Client()
4
5
arg = np.zeros(int(1e7))
6
arg_f = client.scatter(arg, broadcast=True)
7
8
(
9
dask.bag
10
.read_text(str(in_dir / '*.txt'))
11
.map((lambda x, y: x), arg_f)
12
.to_textfiles(str(out_dir / '*.txt'))
13
)
14
This sends a copy of the data to each worker, but does not create a copy for each task.