Empirically it seems that whenever you set_index
on a Dask dataframe, Dask will always put rows with equal indexes into a single partition, even if it results in wildly imbalanced partitions.
Here is a demonstration:
import pandas as pd import dask.dataframe as dd users = [1]*1000 + [2]*1000 + [3]*1000 df = pd.DataFrame({'user': users}) ddf = dd.from_pandas(df, npartitions=1000) ddf = ddf.set_index('user') counts = ddf.map_partitions(lambda x: len(x)).compute() counts.loc[counts > 0] # 500 1000 # 999 2000 # dtype: int64
However, I found no guarantee of this behaviour anywhere.
I have tried to sift through the code myself but gave up. I believe one of these inter-related functions probably holds the answer:
When you set_index
, is it the case that a single index can never be in two different partitions? If not, then under what conditions does this property hold?
Bounty: I will award the bounty to an answer that draws from a reputable source. For example, referring to the implementation to show that this property has to hold.
Advertisement
Answer
is it the case that a single index can never be in two different partitions?
No, it’s certainly allowed. Dask will even intend for this to happen. However, because of a bug in set_index
, all the data will still end up in one partition.
An extreme example (every row is the same value except one):
In [1]: import dask.dataframe as dd In [2]: import pandas as pd In [3]: df = pd.DataFrame({"A": [0] + [1] * 20}) In [4]: ddf = dd.from_pandas(df, npartitions=10) In [5]: s = ddf.set_index("A") In [6]: s.divisions Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)
As you can see, Dask intends for the 0
s to be split up between multiple partitions. Yet when the shuffle actually happens, all the 0
s still end up in one partition:
In [7]: import dask In [8]: dask.compute(s.to_delayed()) # easy way to see the partitions separately Out[8]: ([Empty DataFrame Columns: [] Index: [], Empty DataFrame Columns: [] Index: [], Empty DataFrame Columns: [] Index: [], Empty DataFrame Columns: [] Index: [], Empty DataFrame Columns: [] Index: [], Empty DataFrame Columns: [] Index: [], Empty DataFrame Columns: [] Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)
This is because the code deciding which output partition a row belongs doesn’t consider duplicates in divisions
. Treating divisions
as a Series, it uses searchsorted
with side="right"
, hence why all the data always ends up in the last partition.
I’ll update this answer when the issue is fixed.