I have a df that will join calendar date df,
df = spark.createDataFrame( [ ("1","2021-01-03", "2021-01-15",'2021-01-16',"2"), ("1","2021-01-20", "2021-01-25",'2021-01-22',"1"), ("1","2021-02-06", "2021-02-10",None,"2"), ], ["id","first_date", "end_date",'var1','var2'] ) +---+----------+----------+----------+----+ | id|first_date| end_date| var1|var2| +---+----------+----------+----------+----+ | 1|2021-01-03|2021-01-15|2021-01-16| 2| | 1|2021-01-20|2021-01-25|2021-01-22| 1| | 1|2021-02-06|2021-02-10| null| 2| +---+----------+----------+----------+----+
Next Step: I am populating dates range of first and last date.
from pyspark.sql import functions as F df = df.withColumn('first_date', F.col('first_date').cast('date')) .withColumn('end_date', F.col('end_date').cast('date')) df = df.withColumn("date", F.explode(F.expr("sequence(first_date, end_date, interval 1 day)"))) +---+----------+----------+----------+----+----------+ | id|first_date| end_date| var1|var2| date| +---+----------+----------+----------+----+----------+ | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-03| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-04| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-05| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-06| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-07| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-08| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-09| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-10| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-11| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-12| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-13| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-14| | 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-15| | 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-20| | 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-21| | 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-22| | 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-23| | 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-24| | 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-25| | 1|2021-02-06|2021-02-10| null| 2|2021-02-06| | 1|2021-02-06|2021-02-10| null| 2|2021-02-07| | 1|2021-02-06|2021-02-10| null| 2|2021-02-08| | 1|2021-02-06|2021-02-10| null| 2|2021-02-09| | 1|2021-02-06|2021-02-10| null| 2|2021-02-10| +---+----------+----------+----------+----+----------+
Step2: let’s say this is the calendar df that has id, and calendar dates
datedf = spark.createDataFrame([(1,)], ["id"]) datedf = datedf.withColumn("date", F.explode(F.expr("sequence(to_date('2021-01-01'), to_date('2021-02-28'), interval 1 day)"))) datedf.show() +---+----------+ | id| date| +---+----------+ | 1|2021-01-01| | 1|2021-01-02| | 1|2021-01-03| | 1|2021-01-04| | 1|2021-01-05| | 1|2021-01-06| | 1|2021-01-07| | 1|2021-01-08| | 1|2021-01-09| | 1|2021-01-10| | 1|2021-01-11| | 1|2021-01-12| | 1|2021-01-13| | 1|2021-01-14| | 1|2021-01-15| | 1|2021-01-16| | 1|2021-01-17| | 1|2021-01-18| | 1|2021-01-19| | 1|2021-01-20| | 1|2021-01-21| | 1|2021-01-22| | 1|2021-01-23| | 1|2021-01-24| | 1|2021-01-25| | 1|2021-01-26| | 1|2021-01-27| | 1|2021-01-28| | 1|2021-01-29| | 1|2021-01-30| +---+----------+
and i want to join with calendar dates
test1 = datedf.join(df, on=['date','id'],how='left') +----------+---+----------+----------+----------+----+ | date| id|first_date| end_date| var1|var2| +----------+---+----------+----------+----------+----+ |2021-01-01| 1| null| null|2021-01-16|null| |2021-01-02| 1| null| null|2021-01-16|null| |2021-01-03| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-04| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-05| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-06| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-07| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-08| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-09| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-10| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-11| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-12| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-13| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-14| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-15| 1|2021-01-03|2021-01-15|2021-01-16| 2| |2021-01-16| 1| null| null|2021-01-22|null| |2021-01-17| 1| null| null|2021-01-22|null| |2021-01-18| 1| null| null|2021-01-22|null| |2021-01-19| 1| null| null|2021-01-22|null| |2021-01-20| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-21| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-22| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-23| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-24| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-25| 1|2021-01-20|2021-01-25|2021-01-22| 1| |2021-01-26| 1| null| null| null|null| |2021-01-27| 1| null| null| null|null| |2021-01-28| 1| null| null| null|null| |2021-01-29| 1| null| null| null|null| |2021-01-30| 1| null| null| null|null| +----------+---+----------+----------+----------+----+
I would like to fill in those all null values based on the first non null values and if it’s null until the end of the date, last null values will take the precedence. so it will look like the following..
I could use window function and use .LAST(col,True) to fill up the gaps, but that has to be applied for all the null columns so it’s not efficient. I would like to find a way so that it will fill up the null values efficiently. any help would be appreciated.
Advertisement
Answer
You can add helper columns seq_begin
and seq_end
shown below, in order to generate date sequences that are consecutive, such that the join would not result in nulls:
from pyspark.sql import functions as F, Window mindate = datedf.agg(F.min('date')).head()[0] maxdate = datedf.agg(F.max('date')).head()[0] df2 = df.withColumn( 'seq_begin', F.when( F.row_number().over(Window.partitionBy('id').orderBy('first_date')) == 1, F.lit(mindate) ).otherwise( F.date_add(F.lag('end_date').over(Window.partitionBy('id').orderBy('first_date')), 1) ) ).withColumn( 'seq_end', F.when( F.row_number().over(Window.partitionBy('id').orderBy(F.desc('end_date'))) == 1, F.lit(maxdate) ).otherwise(F.col('end_date')) ).withColumn( "date", F.explode(F.expr("sequence(date(seq_begin), date(seq_end), interval 1 day)")) ) result = datedf.join(df2, on=['date','id'], how='left').orderBy('date') result.show(999) +----------+---+----------+----------+----------+----+----------+----------+ | date| id|first_date| end_date| var1|var2| seq_begin| seq_end| +----------+---+----------+----------+----------+----+----------+----------+ |2021-01-01| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-02| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-03| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-04| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-05| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-06| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-07| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-08| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-09| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-10| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-11| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-12| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-13| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-14| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-15| 1|2021-01-03|2021-01-15|2021-01-16| 2|2021-01-01|2021-01-15| |2021-01-16| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-17| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-18| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-19| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-20| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-21| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-22| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-23| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-24| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-25| 1|2021-01-20|2021-01-25|2021-01-22| 1|2021-01-16|2021-01-25| |2021-01-26| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-01-27| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-01-28| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-01-29| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-01-30| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-01-31| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-01| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-02| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-03| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-04| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-05| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-06| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-07| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-08| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-09| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-10| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-11| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-12| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-13| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-14| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-15| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-16| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-17| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-18| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-19| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-20| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-21| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-22| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-23| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-24| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-25| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-26| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-27| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| |2021-02-28| 1|2021-02-06|2021-02-10| null| 2|2021-01-26|2021-02-28| +----------+---+----------+----------+----------+----+----------+----------+