I have a pyspark dataframe as below:
JavaScript
x
40
40
1
import pandas as pd
2
from pyspark.sql import SparkSession
3
4
spark = (SparkSession.builder
5
.master("local")
6
.getOrCreate())
7
spark.conf.set("spark.sql.session.timeZone", "UTC")
8
9
INPUT = {
10
"idx": [1, 1, 1, 1, 0],
11
"consumption": [10.0, 20.0, 30.0, 40.0, 5.0],
12
"valid_from": [
13
pd.Timestamp("2019-01-01 00:00:00+00:00", tz="UTC"),
14
pd.Timestamp("2019-01-02 00:00:00+00:00", tz="UTC"),
15
pd.Timestamp("2019-01-03 00:00:00+00:00", tz="UTC"),
16
pd.Timestamp("2019-01-06 00:00:00+00:00", tz="UTC"),
17
pd.Timestamp("2019-01-01 00:00:00+00:00", tz="UTC"),
18
],
19
"valid_to": [
20
pd.Timestamp("2019-01-02 00:00:00+0000", tz="UTC"),
21
pd.Timestamp("2019-01-05 00:00:00+0000", tz="UTC"),
22
pd.Timestamp("2019-01-05 00:00:00+0000", tz="UTC"),
23
pd.Timestamp("2019-01-08 00:00:00+0000", tz="UTC"),
24
pd.Timestamp("2019-01-02 00:00:00+00:00", tz="UTC"),
25
],
26
}
27
df=pd.DataFrame.from_dict(INPUT)
28
spark.createDataFrame(df).show()
29
30
>>>
31
+---+-----------+-------------------+-------------------+
32
|idx|consumption| valid_from| valid_to|
33
+---+-----------+-------------------+-------------------+
34
| 1| 10.0|2019-01-01 00:00:00|2019-01-02 00:00:00|
35
| 1| 20.0|2019-01-02 00:00:00|2019-01-05 00:00:00|
36
| 1| 30.0|2019-01-03 00:00:00|2019-01-05 00:00:00|
37
| 1| 40.0|2019-01-06 00:00:00|2019-01-08 00:00:00|
38
| 0| 5.0 |2019-01-01 00:00:00|2019-01-02 00:00:00|
39
+---+-----------+-------------------+-------------------+
40
And I want to sum only consumption
on overlapping interval slices per idx:
JavaScript
1
15
15
1
+---+-------------------+-----------+
2
|idx| timestamp|consumption|
3
+---+-------------------+-----------+
4
| 1|2019-01-01 00:00:00| 10.0|
5
| 1|2019-01-02 00:00:00| 20.0|
6
| 1|2019-01-03 00:00:00| 50.0|
7
| 1|2019-01-04 00:00:00| 50.0|
8
| 1|2019-01-05 00:00:00| 0.0|
9
| 1|2019-01-06 00:00:00| 40.0|
10
| 1|2019-01-07 00:00:00| 40.0|
11
| 1|2019-01-08 00:00:00| 0.0|
12
| 0|2019-01-01 00:00:00| 5.0|
13
| 0|2019-01-02 00:00:00| 0.0|
14
+---+-------------------+-----------+
15
Advertisement
Answer
You can use sequence to expand the intervals into single days, explode the list of days and then sum the consumption
for each timestamp
and idx
:
JavaScript
1
16
16
1
from pyspark.sql import functions as F
2
3
input=spark.createDataFrame(df)
4
input.withColumn("all_days", F.sequence("valid_from", F.date_sub("valid_to", 1 )))
5
.withColumn("timestamp", F.explode("all_days"))
6
.groupBy("idx", "timestamp").sum("consumption")
7
.withColumnRenamed("sum(consumption)", "consumption")
8
.join(input.select("idx", "valid_to").distinct().withColumnRenamed("idx", "idx2"),
9
(F.col("timestamp") == F.col("valid_to")) & (F.col("idx") == F.col("idx2")), "full_outer")
10
.withColumn("idx", F.coalesce("idx", "idx2"))
11
.withColumn("timestamp", F.coalesce("timestamp", "valid_to"))
12
.drop("idx2", "valid_to")
13
.fillna(0.0)
14
.orderBy("idx", "timestamp")
15
.show()
16
Output:
JavaScript
1
16
16
1
input=spark.createDataFrame(df)
2
+---+-------------------+-----------+
3
|idx| timestamp|consumption|
4
+---+-------------------+-----------+
5
| 0|2019-01-01 00:00:00| 5.0|
6
| 0|2019-01-02 00:00:00| 0.0|
7
| 1|2019-01-01 00:00:00| 10.0|
8
| 1|2019-01-02 00:00:00| 20.0|
9
| 1|2019-01-03 00:00:00| 50.0|
10
| 1|2019-01-04 00:00:00| 50.0|
11
| 1|2019-01-05 00:00:00| 0.0|
12
| 1|2019-01-06 00:00:00| 40.0|
13
| 1|2019-01-07 00:00:00| 40.0|
14
| 1|2019-01-08 00:00:00| 0.0|
15
+---+-------------------+-----------+
16
Remarks:
sequence
includes the last value of the interval, so one day has to be substracted fromvalid_to
.- the missing end dates of the intervals are then restored using a full join with the original
valid_to
values, filling upnull
values with0.0
.