I have a df that will join calendar date df,
df = spark.createDataFrame(
[
("1","2021-01-03", "2021-01-15",'2021-01-16',"2"),
("1","2021-01-20", "2021-01-25",'2021-01-22',"1"),
("1","2021-02-06", "2021-02-10",None,"2"),
],
["id","first_date", "end_date",'var1','var2']
)
+---+----------+----------+----------+----+
| id|first_date| end_date| var1|var2|
+---+----------+----------+----------+----+
| 1|2021-01-03|2021-01-15|2021-01-16| 2|
| 1|2021-01-20|2021-01-25|2021-01-22| 1|
| 1|2021-02-06|2021-02-10| null| 2|
+---+----------+----------+----------+----+
Next Step: I am populating dates range of first and last date.
from pyspark.sql import functions as F
df = df.withColumn('first_date', F.col('first_date').cast('date'))
.withColumn('end_date', F.col('end_date').cast('date'))
df = df.withColumn("date", F.explode(F.expr("sequence(first_date, end_date, interval 1 day)")))
+---+----------+----------+----------+----+----------+
| id|first_date| end_date| var1|var2| date|
+---+----------+----------+----------+----+----------+
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-03|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-04|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-05|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-06|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-07|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-08|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-09|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-10|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-11|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-12|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-13|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-14|
| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-15|
| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-20|
| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-21|
| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-22|
| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-23|
| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-24|
| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-25|
| 1|2021-02-06|2021-02-10| null| 2|2021-02-06|
| 1|2021-02-06|2021-02-10| null| 2|2021-02-07|
| 1|2021-02-06|2021-02-10| null| 2|2021-02-08|
| 1|2021-02-06|2021-02-10| null| 2|2021-02-09|
| 1|2021-02-06|2021-02-10| null| 2|2021-02-10|
+---+----------+----------+----------+----+----------+
Step2: let’s say this is the calendar df that has id, and calendar dates
datedf = spark.createDataFrame([(1,)], ["id"])
datedf = datedf.withColumn("date", F.explode(F.expr("sequence(to_date('2021-01-01'), to_date('2021-02-28'), interval 1 day)")))
datedf.show()
+---+----------+
| id| date|
+---+----------+
| 1|2021-01-01|
| 1|2021-01-02|
| 1|2021-01-03|
| 1|2021-01-04|
| 1|2021-01-05|
| 1|2021-01-06|
| 1|2021-01-07|
| 1|2021-01-08|
| 1|2021-01-09|
| 1|2021-01-10|
| 1|2021-01-11|
| 1|2021-01-12|
| 1|2021-01-13|
| 1|2021-01-14|
| 1|2021-01-15|
| 1|2021-01-16|
| 1|2021-01-17|
| 1|2021-01-18|
| 1|2021-01-19|
| 1|2021-01-20|
| 1|2021-01-21|
| 1|2021-01-22|
| 1|2021-01-23|
| 1|2021-01-24|
| 1|2021-01-25|
| 1|2021-01-26|
| 1|2021-01-27|
| 1|2021-01-28|
| 1|2021-01-29|
| 1|2021-01-30|
+---+----------+
and i want to join with calendar dates
test1 = datedf.join(df, on=['date','id'],how='left') +----------+---+----------+----------+----------+----+ | date| id|first_date| end_date| var1|var2| +----------+---+----------+----------+----------+----+ |2021-01-01| 1| null| null|2021-01-16|null| |2021-01-02| 1| null| null|2021-01-16|null| |2021-01-03| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-04| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-05| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-06| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-07| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-08| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-09| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-10| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-11| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-12| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-13| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-14| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-15| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-16| 1| null| null|2021-01-22|null| |2021-01-17| 1| null| null|2021-01-22|null| |2021-01-18| 1| null| null|2021-01-22|null| |2021-01-19| 1| null| null|2021-01-22|null| |2021-01-20| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-21| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-22| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-23| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-24| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-25| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-26| 1| null| null| null|null| |2021-01-27| 1| null| null| null|null| |2021-01-28| 1| null| null| null|null| |2021-01-29| 1| null| null| null|null| |2021-01-30| 1| null| null| null|null| +----------+---+----------+----------+----------+----+
I would like to fill in those all null values based on the first non null values and if it’s null until the end of the date, last null values will take the precedence. so it will look like the following..
I could use window function and use .LAST(col,True) to fill up the gaps, but that has to be applied for all the null columns so it’s not efficient. I would like to find a way so that it will fill up the null values efficiently. any help would be appreciated.
Advertisement
Answer
You can add helper columns seq_begin and seq_end shown below, in order to generate date sequences that are consecutive, such that the join would not result in nulls:
from pyspark.sql import functions as F, Window
mindate = datedf.agg(F.min('date')).head()[0]
maxdate = datedf.agg(F.max('date')).head()[0]
df2 = df.withColumn(
'seq_begin',
F.when(
F.row_number().over(Window.partitionBy('id').orderBy('first_date')) == 1,
F.lit(mindate)
).otherwise(
F.date_add(F.lag('end_date').over(Window.partitionBy('id').orderBy('first_date')), 1)
)
).withColumn(
'seq_end',
F.when(
F.row_number().over(Window.partitionBy('id').orderBy(F.desc('end_date'))) == 1,
F.lit(maxdate)
).otherwise(F.col('end_date'))
).withColumn(
"date",
F.explode(F.expr("sequence(date(seq_begin), date(seq_end), interval 1 day)"))
)
result = datedf.join(df2, on=['date','id'], how='left').orderBy('date')
result.show(999)
+----------+---+----------+----------+----------+----+----------+----------+
| date| id|first_date| end_date| var1|var2| seq_begin| seq_end|
+----------+---+----------+----------+----------+----+----------+----------+
|2021-01-01| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-02| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-03| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-04| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-05| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-06| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-07| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-08| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-09| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-10| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-11| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-12| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-13| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-14| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-15| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15|
|2021-01-16| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-17| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-18| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-19| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-20| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-21| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-22| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-23| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-24| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-25| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25|
|2021-01-26| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-01-27| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-01-28| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-01-29| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-01-30| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-01-31| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-01| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-02| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-03| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-04| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-05| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-06| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-07| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-08| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-09| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-10| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-11| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-12| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-13| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-14| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-15| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-16| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-17| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-18| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-19| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-20| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-21| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-22| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-23| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-24| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-25| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-26| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-27| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
|2021-02-28| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28|
+----------+---+----------+----------+----------+----+----------+----------+
