Skip to content
Advertisement

Writing dask bag to DB using custom function

I’m running a function on dask bag to dump data into NoSQL DB like:

def write_to_db(x):
   # code to write into db
   db.insert_many(x)
   return

def func():
   # code to process each element
   for col in int_cols:
       try:
           x[col] = int(x[col])
       except (ValueError, TypeError):
           x[col] = None

import dask.bag as db

bag = db.read_text(...)
bag = bag.map_partitions(csv.DictReader).map(func).map_partitions(write_to_db)
bag.compute()

Now when I look at the dask task graph, after each partition completes the write_to_db function, it is being shown as memory instead ofreleased.

My Questions:

  1. How to tell dask that there is no return value and hence mark memory as released? For example in the following image, I want the red squares in the right hand side to be marked as released, i.e, blue color.
  2. Does the func() release GIL? Is there anyway to optimize this kind of compute?
  3. Am I doing the correct way to do this kind of computation? (inserting into db by passing custom function to map_partition)

Dask Task Graph

Advertisement

Answer

  1. Yes, Dask is holding the implicit return None values as the result in memory, but these are small, and I wouldn’t worry. The output of your compute() will be a set of Nones (actually, to keep the bag paradign, you might want to make this a list)
  2. Dask does not release the GIL for you, but the DB function you call might – read the docs of that project; if it does not release the GIL, you might see better performance with more processes and fewer threads/process
  3. This seems like a fine way to go. A version using dask.delayed would likely be about the same number of lines.
User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement