I’m loading a large parquet dataframe using Dask but can’t seem to be able to do anything with it without the system crashing on me or getting a million errors and no output.
The data weighs about 165M compressed, or 13G once loaded in pandas (it fits well in the 45G RAM available).
import pandas as pd df = pd.read_parquet('data_simulated.parquet') df.memory_usage(deep=True).sum() * 1e-9 # returns 13.09 df.head() # prints the head of the dataframe properly
Instead, if using Dask
from dask.distributed import Client import dask.dataframe as dataframe client = Client() # prints: <Client: 'tcp://127.0.0.1:38576' processes=7 threads=28, memory=48.32 GB> df = dataframe.read_parquet('data_simulated.parquet') df.memory_usage(deep=True).sum().compute() * 1e-9
prints
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker [a large traceback] KilledWorker: ("('series-groupby-sum-chunk-memory_usage-dc8dab46de985e36d76a85bf3abaccbf', 0)", <Worker 'tcp://127.0.0.1:36882', name: 2, memory: 0, processing: 1>)
the same happens if I try to do df.head(), df.set_index(…) or any other operation that actually computes anything on the dataframe. I’ve tried reducing the number workers, such that each has more memory. I’ve also tried repartitioning the dataframe, but it also fails with the same error. If I set the memory_limit on the client’s LocalCluster to zero, the system just fully crashes.
What am I doing wrong?
Edit: Here’s some extra info on the data (gotten from loading it with Pandas)
In [2]: print(df.dtypes) market_id uint32 choice_id uint64 attribute_1 bool attribute_2 bool attribute_3 bool income float32 is_urban bool distance float32 weight float32 quarter uint32 product_id int64 price float64 size float32 share float32 market_quarter int64 product_type object outside_option int64 dtype: object In [3]: print(df.shape) (89429613, 17)
the object product_type is a string.
Advertisement
Answer
Dask works by loading and processing your data chunk-wise. In the case of parquet, the origin of that chunking comes from the datafiles themselves: internally parquet is organised into “row-groups”, sets of rows that are meant to be read together.
It sounds like in this case, the entire dataset consists of one row-group in one file. This means that Dask has no opportunity to split the data into chunks; you get one task, which takes the full amount of memory pressure in one worker (probably equal to the total data size plus some temporary values), which has only been allocated a faction of the total system memory. Hence the errors.
Note that you can turn off memory monitoring to prevent workers getting killed in the configuration or directly with keywords like memory_limit=0
. In this case, you know that only one worker will be doing the load.
In some very specific situations (no nesting/list/map types), it would be possible to split row-groups, but code for this does not exist, and it would be inefficient due to the compression and encoding of the data.