I’m trying to collect groups of rows into sliding windows represented as vectors.
Given the example input:
+---+-----+-----+ | id|Label|group| +---+-----+-----+ | A| T| 1| | B| T| 1| | C| F| 2| | D| F| 2| | E| F| 3| | F| T| 3| | G| F| 3| | H| T| 3| +---+-----+-----+
An expected output would be:
windows_size = 3 stride = 1 id_padding = '' label_padding = 'f' +-----+-------------+-------------+ |group| Windows| Labels| +-----+-------------+-------------+ | 1| [A, B, '']| [T, T, f]| | 2| [C, D, '']| [F, F, f]| | 3| [E, F, G]| [F, T, F]| | 3| [F, G, H]| [T, F, T]| +-----+-------------+-------------+
My latest attempt produces tumbling windows without padding. Here’s my code:
from pyspark.sql import functions as F from pyspark.sql import Window data = [ ("A", "T", 1), ("B", "T", 1), ("C", "F", 2), ("D", "F", 2), ("E", "F", 3), ("F", "T", 3), ("G", "F", 3), ("H", "T", 3), ] df = spark.createDataFrame(data, ['id', 'label', 'group']) grouping = 3 w2 = Window.partitionBy('group').orderBy('id') df = df.withColumn("rows",((F.row_number().over(w2)-1) / grouping).astype('int') ) df.groupBy('group', 'rows') .agg(F.collect_list('id').alias("Windows"), F.collect_list('Label').alias("Labels")) .drop('rows') .orderBy('group').show()
I tried looking for variations of this, maybe by performing a SQL query like in this case or with some built-in SQL function such as ROWS N PRECEDING, but I didn’t manage to do what I want. Most results from the web focus on temporal sliding windows, but I’m trying to do it over rows instead.
Any help would be greatly appreciated.
EDIT:
I think I found a solution for the padding thanks to this answer.
I still need to organize the rows in sliding windows though…
Advertisement
Answer
One possible solution (not the most elegant one, but still functional) is the following.
In the window definition, it uses .rowsBetween
to create a sliding window of the specified size
; 0
indicates the current row.
import pyspark.sql.functions as F from pyspark.sql.window import Window # parameters size = 3 id_padding = '''' label_padding = 'f' # windows w = Window.partitionBy('group') w_ordered = Window.partitionBy('group').orderBy('id') w_ordered_limited = Window.partitionBy('group').orderBy('id').rowsBetween(0, size - 1) (df.select( 'group', F.collect_list('id').over(w_ordered_limited).alias('Windows'), F.collect_list('Label').over(w_ordered_limited).alias('Groups'), F.count('group').over(w).alias('n'), F.row_number().over(w_ordered).alias('n_row') ) # pad arrays and then slice them to the desired `size` .withColumn('Windows', F.when(F.col('n') < size, F.slice(F.concat('Windows', F.array_repeat(F.lit(id_padding), size - 1)), 1, size)) .otherwise(F.col('Windows'))) .withColumn('Groups', F.when(F.col('n') < size, F.slice(F.concat('Groups', F.array_repeat(F.lit(label_padding), size - 1)), 1, size)) .otherwise(F.col('Groups'))) # filter out useless rows .filter( ((F.col('n') < size) & (F.col('n_row') == 1)) | ((F.col('n') >= size) & (F.size('Windows') == size))) .drop('n', 'n_row') ).show() +-----+----------+---------+ |group| Windows| Groups| +-----+----------+---------+ | 1|[A, B, '']|[T, T, f]| | 2|[C, D, '']|[F, F, f]| | 3| [E, F, G]|[F, T, F]| | 3| [F, G, H]|[T, F, T]| +-----+----------+---------+
I suggest you to go through the solution step-by-step, one code line at a time, to understand the logic behind it.