Skip to content
Advertisement

Pyspark create sliding windows from rows with padding

I’m trying to collect groups of rows into sliding windows represented as vectors.

Given the example input:

+---+-----+-----+
| id|Label|group|
+---+-----+-----+
|  A|    T|    1|
|  B|    T|    1|
|  C|    F|    2|
|  D|    F|    2|
|  E|    F|    3|
|  F|    T|    3|
|  G|    F|    3|
|  H|    T|    3|
+---+-----+-----+

An expected output would be:

windows_size = 3
stride = 1
id_padding = ''
label_padding = 'f'
+-----+-------------+-------------+
|group|      Windows|       Labels|
+-----+-------------+-------------+
|    1|   [A, B, '']|    [T, T, f]|
|    2|   [C, D, '']|    [F, F, f]|
|    3|    [E, F, G]|    [F, T, F]|
|    3|    [F, G, H]|    [T, F, T]|
+-----+-------------+-------------+

My latest attempt produces tumbling windows without padding. Here’s my code:

from pyspark.sql import functions as F
from pyspark.sql import Window

data = [
    ("A", "T", 1),
    ("B", "T", 1),
    ("C", "F", 2),
    ("D", "F", 2),
    ("E", "F", 3),
    ("F", "T", 3),
    ("G", "F", 3),
    ("H", "T", 3),
]
df = spark.createDataFrame(data, ['id', 'label', 'group'])

grouping = 3

w2 = Window.partitionBy('group').orderBy('id')

df = df.withColumn("rows",((F.row_number().over(w2)-1) / grouping).astype('int') )
df.groupBy('group', 'rows')
  .agg(F.collect_list('id').alias("Windows"), F.collect_list('Label').alias("Labels"))
  .drop('rows') 
  .orderBy('group').show()

I tried looking for variations of this, maybe by performing a SQL query like in this case or with some built-in SQL function such as ROWS N PRECEDING, but I didn’t manage to do what I want. Most results from the web focus on temporal sliding windows, but I’m trying to do it over rows instead.

Any help would be greatly appreciated.

EDIT:
I think I found a solution for the padding thanks to this answer.

I still need to organize the rows in sliding windows though…

Advertisement

Answer

One possible solution (not the most elegant one, but still functional) is the following.
In the window definition, it uses .rowsBetween to create a sliding window of the specified size; 0 indicates the current row.

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# parameters
size = 3
id_padding = ''''
label_padding = 'f'

# windows
w = Window.partitionBy('group')
w_ordered = Window.partitionBy('group').orderBy('id')
w_ordered_limited = Window.partitionBy('group').orderBy('id').rowsBetween(0, size - 1)

(df.select(
  'group',
  F.collect_list('id').over(w_ordered_limited).alias('Windows'),
  F.collect_list('Label').over(w_ordered_limited).alias('Groups'),
  F.count('group').over(w).alias('n'),
  F.row_number().over(w_ordered).alias('n_row')
  )
  # pad arrays and then slice them to the desired `size`
  .withColumn('Windows', F.when(F.col('n') < size, F.slice(F.concat('Windows', F.array_repeat(F.lit(id_padding), size - 1)), 1, size))
                          .otherwise(F.col('Windows')))
  .withColumn('Groups',  F.when(F.col('n') < size, F.slice(F.concat('Groups', F.array_repeat(F.lit(label_padding), size - 1)), 1, size))
                          .otherwise(F.col('Groups')))
  # filter out useless rows
  .filter( ((F.col('n') < size) & (F.col('n_row') == 1)) 
           | ((F.col('n') >= size) & (F.size('Windows') == size)))
  .drop('n', 'n_row')
 ).show()

+-----+----------+---------+
|group|   Windows|   Groups|
+-----+----------+---------+
|    1|[A, B, '']|[T, T, f]|
|    2|[C, D, '']|[F, F, f]|
|    3| [E, F, G]|[F, T, F]|
|    3| [F, G, H]|[T, F, T]|
+-----+----------+---------+

I suggest you to go through the solution step-by-step, one code line at a time, to understand the logic behind it.

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement