Skip to content
Advertisement

Extracting latest values in a Dask dataframe with non-unique index column dates

I’m quite familiar with pandas dataframes but I’m very new to Dask so I’m still trying to wrap my head around parallelizing my code. I’ve obtained my desired results using pandas and pandarallel already so what I’m trying to figure out is if I can scale up the task or speed it up somehow using Dask.

Let’s say my dataframe has datetimes as non-unique indices, a values column and an id column.

time                        value   id
2021-01-01 00:00:00.210281  28.08   293707
2021-01-01 00:00:00.279228  28.07   293708
2021-01-01 00:00:00.697341  28.08   293709
2021-01-01 00:00:00.941704  28.08   293710
2021-01-01 00:00:00.945422  28.07   293711
...     ...     ...
2021-01-01 23:59:59.288914  29.84   512665
2021-01-01 23:59:59.288914  29.83   512666
2021-01-01 23:59:59.288914  29.82   512667
2021-01-01 23:59:59.525227  29.84   512668
2021-01-01 23:59:59.784754  29.84   512669

What I want to extract is the latest value for every second. e.g. if the price right before 2021-01-01 00:00:01 is the row with the index of 2021-01-01 00:00:00.945422 the latest value is 28.07.

In my case, sometimes index values are not unique so as a tie breaker, I’d like to use the id column. The value with the largest id number will be considered the latest value. For the case of the three values tied at the time 2021-01-01 23:59:59.288914, the value 29.82 would be chosen since the largest id for that date would be 512667. Also note that id is not consistent throughout the dataset and I cannot only rely on it for ordering my data.

In pandas I simply do this by obtaining the last index

last_index = df.loc[date_minus60: date_curr].index[-1]
last_values = df.loc[last_index]

and then if the value of last_values.index.is_unique is false, I finally perform last_values.sort_values('id').iloc[-1].

I’ve been having a hard time translating this code to Dask encountering issues regarding my delayed function resulting to them to need computing before I can reindex my dataframe again.

I’d like to know if there are any best practices to dealing with this sort of problem.

Advertisement

Answer

The snippet below shows that it’s a very similar syntax:

import dask

# generate dask dataframe
ddf = dask.datasets.timeseries(freq="500ms", partition_freq="1h")

# generate a pandas dataframe
df = ddf.partitions[0].compute()  # pandas df for example

# sample dates
date_minus60 = "2000-01-01 00:00:00.000"
date_curr = "2000-01-01 00:00:02.000"

# pandas code
last_index_pandas = df.loc[date_minus60:date_curr].index[-1]
last_values_pandas = df.loc[last_index_pandas]

# dask code
last_index_dask = ddf.loc[date_minus60:date_curr].compute().index[-1]
last_values_dask = ddf.loc[last_index_dask].compute()

# check equality of the results
print(last_values_pandas == last_values_dask)

Note that the distinction is in two .compute steps in dask version, since two lazy values need to be computed: first is to find out the correct index location and second is to get the actual value. Also this assumes that the data is already indexed by the timestamp, if it is not, it’s best to index the data before loading into dask since .set_index is in general a slow operation.

However, depending on what you are really after this is probably not a great use of dask. If the underlying idea is to do fast lookups, then a better solution is to use indexed databases (including specialised time-series databases).

Finally, the snippet above is using unique index. If the actual data has non-unique indexes, then the requirement on selection by largest id is something that should be handled once the last_values_dask is computed, by using something like this (pseudo code, not expected to work right away):

def get_largest_id(last_values):
    return last_values.sort_values('id').tail(1)

last_values_dask = get_largest_id(last_values_dask)

There is scope for designing a better pipeline if the lookup is for batches (rather than specific sample dates).

Advertisement