I am trying to create a new column of lists in Pyspark using a groupby aggregation on existing set of columns. An example input data frame is provided below:
------------------------ id | date | value ------------------------ 1 |2014-01-03 | 10 1 |2014-01-04 | 5 1 |2014-01-05 | 15 1 |2014-01-06 | 20 2 |2014-02-10 | 100 2 |2014-03-11 | 500 2 |2014-04-15 | 1500
The expected output is:
id | value_list ------------------------ 1 | [10, 5, 15, 20] 2 | [100, 500, 1500]
The values within a list are sorted by the date.
I tried using collect_list as follows:
from pyspark.sql import functions as F ordered_df = input_df.orderBy(['id','date'],ascending = True) grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))
But collect_list doesn’t guarantee order even if I sort the input data frame by date before aggregation.
Could someone help on how to do aggregation by preserving the order based on a second (date) variable?
Advertisement
Answer
If you collect both dates and values as a list, you can sort the resulting column according to date using and udf
, and then keep only the values in the result.
import operator import pyspark.sql.functions as F # create list column grouped_df = input_df.groupby("id") .agg(F.collect_list(F.struct("date", "value")) .alias("list_col")) # define udf def sorter(l): res = sorted(l, key=operator.itemgetter(0)) return [item[1] for item in res] sort_udf = F.udf(sorter) # test grouped_df.select("id", sort_udf("list_col") .alias("sorted_list")) .show(truncate = False) +---+----------------+ |id |sorted_list | +---+----------------+ |1 |[10, 5, 15, 20] | |2 |[100, 500, 1500]| +---+----------------+