I have a column named info
defined as well:
JavaScript
x
12
12
1
| Timestamp | info |
2
+-------------------+----------+
3
|2016-01-01 17:54:30| 0 |
4
|2016-02-01 12:16:18| 0 |
5
|2016-03-01 12:17:57| 0 |
6
|2016-04-01 10:05:21| 0 |
7
|2016-05-11 18:58:25| 1 |
8
|2016-06-11 11:18:29| 1 |
9
|2016-07-01 12:05:21| 0 |
10
|2016-08-11 11:58:25| 0 |
11
|2016-09-11 15:18:29| 1 |
12
I would like to count the consecutive occurrences of 1s and insert 0 otherwise. The final column would be:
JavaScript
1
13
13
1
--------------------+----------+----------+
2
| Timestamp | info | res |
3
+-------------------+----------+----------+
4
|2016-01-01 17:54:30| 0 | 0 |
5
|2016-02-01 12:16:18| 0 | 0 |
6
|2016-03-01 12:17:57| 0 | 0 |
7
|2016-04-01 10:05:21| 0 | 0 |
8
|2016-05-11 18:58:25| 1 | 1 |
9
|2016-06-11 11:18:29| 1 | 2 |
10
|2016-07-01 12:05:21| 0 | 0 |
11
|2016-08-11 11:58:25| 0 | 0 |
12
|2016-09-11 15:18:29| 1 | 1 |
13
I tried using the following function, but it didn’t work.
JavaScript
1
8
1
df_input = df_input.withColumn(
2
"res",
3
F.when(
4
df_input.info == F.lag(df_input.info).over(w1),
5
F.sum(F.lit(1)).over(w1)
6
).otherwise(0)
7
)
8
Advertisement
Answer
From Adding a column counting cumulative pervious repeating values, credits to @blackbishop
JavaScript
1
28
28
1
from pyspark.sql import functions as F, Window
2
3
df = spark.createDataFrame([0, 0, 0, 0, 1, 1, 0, 0, 1], 'int').toDF('info')
4
5
df.withColumn("ID", F.monotonically_increasing_id())
6
.withColumn("group",
7
F.row_number().over(Window.orderBy("ID"))
8
- F.row_number().over(Window.partitionBy("info").orderBy("ID"))
9
)
10
.withColumn("Result", F.when(F.col('info') != 0, F.row_number().over(Window.partitionBy("group").orderBy("ID"))).otherwise(F.lit(0)))
11
.orderBy("ID")
12
.drop("ID", "group")
13
.show()
14
15
+----+------+
16
|info|Result|
17
+----+------+
18
| 0| 0|
19
| 0| 0|
20
| 0| 0|
21
| 0| 0|
22
| 1| 1|
23
| 1| 2|
24
| 0| 0|
25
| 0| 0|
26
| 1| 1|
27
+----+------+
28