Empirically it seems that whenever you set_index
on a Dask dataframe, Dask will always put rows with equal indexes into a single partition, even if it results in wildly imbalanced partitions.
Here is a demonstration:
import pandas as pd
import dask.dataframe as dd
users = [1]*1000 + [2]*1000 + [3]*1000
df = pd.DataFrame({'user': users})
ddf = dd.from_pandas(df, npartitions=1000)
ddf = ddf.set_index('user')
counts = ddf.map_partitions(lambda x: len(x)).compute()
counts.loc[counts > 0]
# 500 1000
# 999 2000
# dtype: int64
However, I found no guarantee of this behaviour anywhere.
I have tried to sift through the code myself but gave up. I believe one of these inter-related functions probably holds the answer:
When you set_index
, is it the case that a single index can never be in two different partitions? If not, then under what conditions does this property hold?
Bounty: I will award the bounty to an answer that draws from a reputable source. For example, referring to the implementation to show that this property has to hold.
Advertisement
Answer
is it the case that a single index can never be in two different partitions?
No, it’s certainly allowed. Dask will even intend for this to happen. However, because of a bug in set_index
, all the data will still end up in one partition.
An extreme example (every row is the same value except one):
In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = pd.DataFrame({"A": [0] + [1] * 20})
In [4]: ddf = dd.from_pandas(df, npartitions=10)
In [5]: s = ddf.set_index("A")
In [6]: s.divisions
Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)
As you can see, Dask intends for the 0
s to be split up between multiple partitions. Yet when the shuffle actually happens, all the 0
s still end up in one partition:
In [7]: import dask
In [8]: dask.compute(s.to_delayed()) # easy way to see the partitions separately
Out[8]:
([Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)
This is because the code deciding which output partition a row belongs doesn’t consider duplicates in divisions
. Treating divisions
as a Series, it uses searchsorted
with side="right"
, hence why all the data always ends up in the last partition.
I’ll update this answer when the issue is fixed.