I have a pyspark dataframe that contains the columns start_time
, end_time
that define an interval per row.
There is a column rate
, and I want to know if there is not different values for a sub-interval (that is overlapped by definition); and if it is the case, I want to keep the last record as the ground truth.
Inputs:
JavaScript
x
41
41
1
# So this:
2
input_rows = [Row(start_time='2018-01-01 00:00:00', end_time='2018-01-04 00:00:00', rate=10), # OVERLAP: (1,4) and (2,3) and (3,5) and rate=10/20
3
Row(start_time='2018-01-02 00:00:00', end_time='2018-01-03 00:00:00', rate=10), # OVERLAP: full overlap for (2,3) with (1,4)
4
Row(start_time='2018-01-03 00:00:00', end_time='2018-01-05 00:00:00', rate=20), # OVERLAP: (3,5) and (1,4) and rate=10/20
5
Row(start_time='2018-01-06 00:00:00', end_time='2018-01-07 00:00:00', rate=30), # NO OVERLAP: hole between (5,6)
6
Row(start_time='2018-01-07 00:00:00', end_time='2018-01-08 00:00:00', rate=30)] # NO OVERLAP
7
8
df = spark.createDataFrame(input_rows)
9
df.show()
10
>>> +-------------------+-------------------+----+
11
| start_time| end_time|rate|
12
+-------------------+-------------------+----+
13
|2018-01-01 00:00:00|2018-01-04 00:00:00| 10|
14
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|
15
|2018-01-03 00:00:00|2018-01-05 00:00:00| 20|
16
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|
17
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|
18
+-------------------+-------------------+----+
19
20
# To give you:
21
output_rows = [Row(start_time='2018-01-01 00:00:00', end_time='2018-01-02 00:00:00', rate=10),
22
Row(start_time='2018-01-02 00:00:00', end_time='2018-01-03 00:00:00', rate=10),
23
Row(start_time='2018-01-03 00:00:00', end_time='2018-01-04 00:00:00', rate=20),
24
Row(start_time='2018-01-04 00:00:00', end_time='2018-01-05 00:00:00', rate=20),
25
Row(start_time='2018-01-06 00:00:00', end_time='2018-01-07 00:00:00', rate=30),
26
Row(start_time='2018-01-07 00:00:00', end_time='2018-01-08 00:00:00', rate=30)
27
]
28
final_df = spark.createDataFrame(output_rows)
29
final_df.show()
30
>>>
31
+-------------------+-------------------+----+
32
| start_time| end_time|rate|
33
+-------------------+-------------------+----+
34
|2018-01-01 00:00:00|2018-01-02 00:00:00| 10|
35
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|
36
|2018-01-03 00:00:00|2018-01-04 00:00:00| 20|
37
|2018-01-04 00:00:00|2018-01-05 00:00:00| 20|
38
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|
39
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|
40
+-------------------+-------------------+----+
41
Advertisement
Answer
You can compare the end_time with the next start_time, and replace the end_time with the next start_time if the latter is smaller than the former.
JavaScript
1
30
30
1
from pyspark.sql import functions as F, Window
2
3
df2 = df.withColumn(
4
'end_time2',
5
F.min('start_time').over(
6
Window.orderBy('start_time')
7
.rowsBetween(1, Window.unboundedFollowing)
8
)
9
).select(
10
'start_time',
11
F.when(
12
F.col('end_time2') < F.col('end_time'),
13
F.col('end_time2')
14
).otherwise(
15
F.col('end_time')
16
).alias('end_time'),
17
'rate'
18
)
19
20
df2.show()
21
+-------------------+-------------------+----+
22
| start_time| end_time|rate|
23
+-------------------+-------------------+----+
24
|2018-01-01 00:00:00|2018-01-02 00:00:00| 10|
25
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|
26
|2018-01-03 00:00:00|2018-01-05 00:00:00| 20|
27
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|
28
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|
29
+-------------------+-------------------+----+
30