I’m running a function on dask bag to dump data into NoSQL DB like:
JavaScript
x
19
19
1
def write_to_db(x):
2
# code to write into db
3
db.insert_many(x)
4
return
5
6
def func():
7
# code to process each element
8
for col in int_cols:
9
try:
10
x[col] = int(x[col])
11
except (ValueError, TypeError):
12
x[col] = None
13
14
import dask.bag as db
15
16
bag = db.read_text( )
17
bag = bag.map_partitions(csv.DictReader).map(func).map_partitions(write_to_db)
18
bag.compute()
19
Now when I look at the dask task graph, after each partition completes the write_to_db function, it is being shown as memory
instead ofreleased
.
My Questions:
- How to tell dask that there is no return value and hence mark memory as released? For example in the following image, I want the red squares in the right hand side to be marked as
released
, i.e, blue color. - Does the
func()
releaseGIL
? Is there anyway to optimize this kind of compute? - Am I doing the correct way to do this kind of computation? (inserting into db by passing custom function to map_partition)
Advertisement
Answer
- Yes, Dask is holding the implicit return
None
values as the result in memory, but these are small, and I wouldn’t worry. The output of yourcompute()
will be a set ofNone
s (actually, to keep the bag paradign, you might want to make this a list) - Dask does not release the GIL for you, but the DB function you call might – read the docs of that project; if it does not release the GIL, you might see better performance with more processes and fewer threads/process
- This seems like a fine way to go. A version using
dask.delayed
would likely be about the same number of lines.