I’m running a function on dask bag to dump data into NoSQL DB like:
def write_to_db(x):
   # code to write into db
   db.insert_many(x)
   return
def func():
   # code to process each element
   for col in int_cols:
       try:
           x[col] = int(x[col])
       except (ValueError, TypeError):
           x[col] = None
import dask.bag as db
bag = db.read_text(...)
bag = bag.map_partitions(csv.DictReader).map(func).map_partitions(write_to_db)
bag.compute()
Now when I look at the dask task graph, after each partition completes the write_to_db function, it is being shown as memory instead ofreleased.
My Questions:
- How to tell dask that there is no return value and hence mark memory as released? For example in the following image, I want the red squares in the right hand side to be marked as released, i.e, blue color.
- Does the func()releaseGIL? Is there anyway to optimize this kind of compute?
- Am I doing the correct way to do this kind of computation? (inserting into db by passing custom function to map_partition)
Advertisement
Answer
- Yes, Dask is holding the implicit return Nonevalues as the result in memory, but these are small, and I wouldn’t worry. The output of yourcompute()will be a set ofNones (actually, to keep the bag paradign, you might want to make this a list)
- Dask does not release the GIL for you, but the DB function you call might – read the docs of that project; if it does not release the GIL, you might see better performance with more processes and fewer threads/process
- This seems like a fine way to go. A version using dask.delayedwould likely be about the same number of lines.
