I have a column named info
defined as well:
| Timestamp | info | +-------------------+----------+ |2016-01-01 17:54:30| 0 | |2016-02-01 12:16:18| 0 | |2016-03-01 12:17:57| 0 | |2016-04-01 10:05:21| 0 | |2016-05-11 18:58:25| 1 | |2016-06-11 11:18:29| 1 | |2016-07-01 12:05:21| 0 | |2016-08-11 11:58:25| 0 | |2016-09-11 15:18:29| 1 |
I would like to count the consecutive occurrences of 1s and insert 0 otherwise. The final column would be:
--------------------+----------+----------+ | Timestamp | info | res | +-------------------+----------+----------+ |2016-01-01 17:54:30| 0 | 0 | |2016-02-01 12:16:18| 0 | 0 | |2016-03-01 12:17:57| 0 | 0 | |2016-04-01 10:05:21| 0 | 0 | |2016-05-11 18:58:25| 1 | 1 | |2016-06-11 11:18:29| 1 | 2 | |2016-07-01 12:05:21| 0 | 0 | |2016-08-11 11:58:25| 0 | 0 | |2016-09-11 15:18:29| 1 | 1 |
I tried using the following function, but it didn’t work.
df_input = df_input.withColumn( "res", F.when( df_input.info == F.lag(df_input.info).over(w1), F.sum(F.lit(1)).over(w1) ).otherwise(0) )
Advertisement
Answer
From Adding a column counting cumulative pervious repeating values, credits to @blackbishop
from pyspark.sql import functions as F, Window df = spark.createDataFrame([0, 0, 0, 0, 1, 1, 0, 0, 1], 'int').toDF('info') df.withColumn("ID", F.monotonically_increasing_id()) .withColumn("group", F.row_number().over(Window.orderBy("ID")) - F.row_number().over(Window.partitionBy("info").orderBy("ID")) ) .withColumn("Result", F.when(F.col('info') != 0, F.row_number().over(Window.partitionBy("group").orderBy("ID"))).otherwise(F.lit(0))) .orderBy("ID") .drop("ID", "group") .show() +----+------+ |info|Result| +----+------+ | 0| 0| | 0| 0| | 0| 0| | 0| 0| | 1| 1| | 1| 2| | 0| 0| | 0| 0| | 1| 1| +----+------+