I’m running a function on dask bag to dump data into NoSQL DB like:
def write_to_db(x):
# code to write into db
db.insert_many(x)
return
def func():
# code to process each element
for col in int_cols:
try:
x[col] = int(x[col])
except (ValueError, TypeError):
x[col] = None
import dask.bag as db
bag = db.read_text(...)
bag = bag.map_partitions(csv.DictReader).map(func).map_partitions(write_to_db)
bag.compute()
Now when I look at the dask task graph, after each partition completes the write_to_db function, it is being shown as memory instead ofreleased.
My Questions:
- How to tell dask that there is no return value and hence mark memory as released? For example in the following image, I want the red squares in the right hand side to be marked as
released, i.e, blue color. - Does the
func()releaseGIL? Is there anyway to optimize this kind of compute? - Am I doing the correct way to do this kind of computation? (inserting into db by passing custom function to map_partition)
Advertisement
Answer
- Yes, Dask is holding the implicit return
Nonevalues as the result in memory, but these are small, and I wouldn’t worry. The output of yourcompute()will be a set ofNones (actually, to keep the bag paradign, you might want to make this a list) - Dask does not release the GIL for you, but the DB function you call might – read the docs of that project; if it does not release the GIL, you might see better performance with more processes and fewer threads/process
- This seems like a fine way to go. A version using
dask.delayedwould likely be about the same number of lines.
