Skip to content
Advertisement

Dask dataframe crashes

I’m loading a large parquet dataframe using Dask but can’t seem to be able to do anything with it without the system crashing on me or getting a million errors and no output.

The data weighs about 165M compressed, or 13G once loaded in pandas (it fits well in the 45G RAM available).

import pandas as pd

df = pd.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum() * 1e-9
# returns 13.09
df.head()
# prints the head of the dataframe properly

Instead, if using Dask

from dask.distributed import Client
import dask.dataframe as dataframe

client = Client()
# prints: <Client: 'tcp://127.0.0.1:38576' processes=7 threads=28, memory=48.32 GB>
df = dataframe.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum().compute() * 1e-9 

prints

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

[a large traceback]
KilledWorker: ("('series-groupby-sum-chunk-memory_usage-dc8dab46de985e36d76a85bf3abaccbf', 0)", <Worker 'tcp://127.0.0.1:36882', name: 2, memory: 0, processing: 1>)

the same happens if I try to do df.head(), df.set_index(…) or any other operation that actually computes anything on the dataframe. I’ve tried reducing the number workers, such that each has more memory. I’ve also tried repartitioning the dataframe, but it also fails with the same error. If I set the memory_limit on the client’s LocalCluster to zero, the system just fully crashes.

What am I doing wrong?

Edit: Here’s some extra info on the data (gotten from loading it with Pandas)

In [2]: print(df.dtypes)                                                                                                            
market_id          uint32
choice_id          uint64
attribute_1          bool
attribute_2          bool
attribute_3          bool
income            float32
is_urban             bool
distance          float32
weight            float32
quarter            uint32
product_id          int64
price             float64
size              float32
share             float32
market_quarter      int64
product_type       object
outside_option      int64
dtype: object

In [3]: print(df.shape)                                                                                                             
(89429613, 17)

the object product_type is a string.

Advertisement

Answer

Dask works by loading and processing your data chunk-wise. In the case of parquet, the origin of that chunking comes from the datafiles themselves: internally parquet is organised into “row-groups”, sets of rows that are meant to be read together.

It sounds like in this case, the entire dataset consists of one row-group in one file. This means that Dask has no opportunity to split the data into chunks; you get one task, which takes the full amount of memory pressure in one worker (probably equal to the total data size plus some temporary values), which has only been allocated a faction of the total system memory. Hence the errors.

Note that you can turn off memory monitoring to prevent workers getting killed in the configuration or directly with keywords like memory_limit=0. In this case, you know that only one worker will be doing the load.

In some very specific situations (no nesting/list/map types), it would be possible to split row-groups, but code for this does not exist, and it would be inefficient due to the compression and encoding of the data.

User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement