I’m running a function on dask bag to dump data into NoSQL DB like:
def write_to_db(x): # code to write into db db.insert_many(x) return def func(): # code to process each element for col in int_cols: try: x[col] = int(x[col]) except (ValueError, TypeError): x[col] = None import dask.bag as db bag = db.read_text(...) bag = bag.map_partitions(csv.DictReader).map(func).map_partitions(write_to_db) bag.compute()
Now when I look at the dask task graph, after each partition completes the write_to_db function, it is being shown as memory
instead ofreleased
.
My Questions:
- How to tell dask that there is no return value and hence mark memory as released? For example in the following image, I want the red squares in the right hand side to be marked as
released
, i.e, blue color. - Does the
func()
releaseGIL
? Is there anyway to optimize this kind of compute? - Am I doing the correct way to do this kind of computation? (inserting into db by passing custom function to map_partition)
Advertisement
Answer
- Yes, Dask is holding the implicit return
None
values as the result in memory, but these are small, and I wouldn’t worry. The output of yourcompute()
will be a set ofNone
s (actually, to keep the bag paradign, you might want to make this a list) - Dask does not release the GIL for you, but the DB function you call might – read the docs of that project; if it does not release the GIL, you might see better performance with more processes and fewer threads/process
- This seems like a fine way to go. A version using
dask.delayed
would likely be about the same number of lines.