I am trying to create a new column of lists in Pyspark using a groupby aggregation on existing set of columns. An example input data frame is provided below:
JavaScript
x
11
11
1
------------------------
2
id | date | value
3
------------------------
4
1 |2014-01-03 | 10
5
1 |2014-01-04 | 5
6
1 |2014-01-05 | 15
7
1 |2014-01-06 | 20
8
2 |2014-02-10 | 100
9
2 |2014-03-11 | 500
10
2 |2014-04-15 | 1500
11
The expected output is:
JavaScript
1
5
1
id | value_list
2
------------------------
3
1 | [10, 5, 15, 20]
4
2 | [100, 500, 1500]
5
The values within a list are sorted by the date.
I tried using collect_list as follows:
JavaScript
1
4
1
from pyspark.sql import functions as F
2
ordered_df = input_df.orderBy(['id','date'],ascending = True)
3
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))
4
But collect_list doesn’t guarantee order even if I sort the input data frame by date before aggregation.
Could someone help on how to do aggregation by preserving the order based on a second (date) variable?
Advertisement
Answer
If you collect both dates and values as a list, you can sort the resulting column according to date using and udf
, and then keep only the values in the result.
JavaScript
1
26
26
1
import operator
2
import pyspark.sql.functions as F
3
4
# create list column
5
grouped_df = input_df.groupby("id")
6
.agg(F.collect_list(F.struct("date", "value"))
7
.alias("list_col"))
8
9
# define udf
10
def sorter(l):
11
res = sorted(l, key=operator.itemgetter(0))
12
return [item[1] for item in res]
13
14
sort_udf = F.udf(sorter)
15
16
# test
17
grouped_df.select("id", sort_udf("list_col")
18
.alias("sorted_list"))
19
.show(truncate = False)
20
+---+----------------+
21
|id |sorted_list |
22
+---+----------------+
23
|1 |[10, 5, 15, 20] |
24
|2 |[100, 500, 1500]|
25
+---+----------------+
26