Skip to content

Dask dataframe: Can `set_index` put a single index into multiple partitions?

Empirically it seems that whenever you set_index on a Dask dataframe, Dask will always put rows with equal indexes into a single partition, even if it results in wildly imbalanced partitions.

Here is a demonstration:

import pandas as pd
import dask.dataframe as dd

users = [1]*1000 + [2]*1000 + [3]*1000

df = pd.DataFrame({'user': users})
ddf = dd.from_pandas(df, npartitions=1000)

ddf = ddf.set_index('user')

counts = ddf.map_partitions(lambda x: len(x)).compute()
counts.loc[counts > 0]
# 500    1000
# 999    2000
# dtype: int64

However, I found no guarantee of this behaviour anywhere.

I have tried to sift through the code myself but gave up. I believe one of these inter-related functions probably holds the answer:

When you set_index, is it the case that a single index can never be in two different partitions? If not, then under what conditions does this property hold?

Bounty: I will award the bounty to an answer that draws from a reputable source. For example, referring to the implementation to show that this property has to hold.



is it the case that a single index can never be in two different partitions?

No, it’s certainly allowed. Dask will even intend for this to happen. However, because of a bug in set_index, all the data will still end up in one partition.

An extreme example (every row is the same value except one):

In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = pd.DataFrame({"A": [0] + [1] * 20})
In [4]: ddf = dd.from_pandas(df, npartitions=10)
In [5]: s = ddf.set_index("A")
In [6]: s.divisions
Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)

As you can see, Dask intends for the 0s to be split up between multiple partitions. Yet when the shuffle actually happens, all the 0s still end up in one partition:

In [7]: import dask
In [8]: dask.compute(s.to_delayed())  # easy way to see the partitions separately
([Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)

This is because the code deciding which output partition a row belongs doesn’t consider duplicates in divisions. Treating divisions as a Series, it uses searchsorted with side="right", hence why all the data always ends up in the last partition.

I’ll update this answer when the issue is fixed.

User contributions licensed under: CC BY-SA
10 People found this is helpful