Skip to content
Advertisement

how to fill in null values in Pyspark

I have a df that will join calendar date df,

df = spark.createDataFrame(
    [
        ("1","2021-01-03", "2021-01-15",'2021-01-16',"2"), 
        ("1","2021-01-20", "2021-01-25",'2021-01-22',"1"),
        ("1","2021-02-06", "2021-02-10",None,"2"),
    ],
    ["id","first_date", "end_date",'var1','var2']  
)
+---+----------+----------+----------+----+
| id|first_date|  end_date|      var1|var2|
+---+----------+----------+----------+----+
|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|  1|2021-01-20|2021-01-25|2021-01-22|   1|
|  1|2021-02-06|2021-02-10|      null|   2|
+---+----------+----------+----------+----+

Next Step: I am populating dates range of first and last date.

from pyspark.sql import functions as F

df = df.withColumn('first_date', F.col('first_date').cast('date'))
       .withColumn('end_date', F.col('end_date').cast('date'))
df = df.withColumn("date", F.explode(F.expr("sequence(first_date, end_date, interval 1 day)")))
+---+----------+----------+----------+----+----------+
| id|first_date|  end_date|      var1|var2|      date|
+---+----------+----------+----------+----+----------+
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-03|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-04|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-05|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-06|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-07|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-08|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-09|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-10|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-11|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-12|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-13|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-14|
|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-15|
|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-20|
|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-21|
|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-22|
|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-23|
|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-24|
|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-25|
|  1|2021-02-06|2021-02-10|      null|   2|2021-02-06|
|  1|2021-02-06|2021-02-10|      null|   2|2021-02-07|
|  1|2021-02-06|2021-02-10|      null|   2|2021-02-08|
|  1|2021-02-06|2021-02-10|      null|   2|2021-02-09|
|  1|2021-02-06|2021-02-10|      null|   2|2021-02-10|
+---+----------+----------+----------+----+----------+

Step2: let’s say this is the calendar df that has id, and calendar dates

datedf = spark.createDataFrame([(1,)], ["id"])
datedf = datedf.withColumn("date", F.explode(F.expr("sequence(to_date('2021-01-01'), to_date('2021-02-28'), interval 1 day)")))

datedf.show()
+---+----------+
| id|      date|
+---+----------+
|  1|2021-01-01|
|  1|2021-01-02|
|  1|2021-01-03|
|  1|2021-01-04|
|  1|2021-01-05|
|  1|2021-01-06|
|  1|2021-01-07|
|  1|2021-01-08|
|  1|2021-01-09|
|  1|2021-01-10|
|  1|2021-01-11|
|  1|2021-01-12|
|  1|2021-01-13|
|  1|2021-01-14|
|  1|2021-01-15|
|  1|2021-01-16|
|  1|2021-01-17|
|  1|2021-01-18|
|  1|2021-01-19|
|  1|2021-01-20|
|  1|2021-01-21|
|  1|2021-01-22|
|  1|2021-01-23|
|  1|2021-01-24|
|  1|2021-01-25|
|  1|2021-01-26|
|  1|2021-01-27|
|  1|2021-01-28|
|  1|2021-01-29|
|  1|2021-01-30|
+---+----------+

and i want to join with calendar dates

test1 = datedf.join(df, on=['date','id'],how='left')
+----------+---+----------+----------+----------+----+
|      date| id|first_date|  end_date|      var1|var2|
+----------+---+----------+----------+----------+----+
|2021-01-01|  1|      null|      null|2021-01-16|null|
|2021-01-02|  1|      null|      null|2021-01-16|null|
|2021-01-03|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-04|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-05|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-06|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-07|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-08|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-09|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-10|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-11|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-12|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-13|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-14|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-15|  1|2021-01-03|2021-01-15|2021-01-16|   2|
|2021-01-16|  1|      null|      null|2021-01-22|null|
|2021-01-17|  1|      null|      null|2021-01-22|null|
|2021-01-18|  1|      null|      null|2021-01-22|null|
|2021-01-19|  1|      null|      null|2021-01-22|null|
|2021-01-20|  1|2021-01-20|2021-01-25|2021-01-22|   1|
|2021-01-21|  1|2021-01-20|2021-01-25|2021-01-22|   1|
|2021-01-22|  1|2021-01-20|2021-01-25|2021-01-22|   1|
|2021-01-23|  1|2021-01-20|2021-01-25|2021-01-22|   1|
|2021-01-24|  1|2021-01-20|2021-01-25|2021-01-22|   1|
|2021-01-25|  1|2021-01-20|2021-01-25|2021-01-22|   1|
|2021-01-26|  1|      null|      null|      null|null|
|2021-01-27|  1|      null|      null|      null|null|
|2021-01-28|  1|      null|      null|      null|null|
|2021-01-29|  1|      null|      null|      null|null|
|2021-01-30|  1|      null|      null|      null|null|
+----------+---+----------+----------+----------+----+

I would like to fill in those all null values based on the first non null values and if it’s null until the end of the date, last null values will take the precedence. so it will look like the following..

I could use window function and use .LAST(col,True) to fill up the gaps, but that has to be applied for all the null columns so it’s not efficient. I would like to find a way so that it will fill up the null values efficiently. any help would be appreciated.

enter image description here

Advertisement

Answer

You can add helper columns seq_begin and seq_end shown below, in order to generate date sequences that are consecutive, such that the join would not result in nulls:

from pyspark.sql import functions as F, Window

mindate = datedf.agg(F.min('date')).head()[0]
maxdate = datedf.agg(F.max('date')).head()[0]

df2 = df.withColumn(
    'seq_begin', 
    F.when(
        F.row_number().over(Window.partitionBy('id').orderBy('first_date')) == 1, 
        F.lit(mindate)
    ).otherwise(
        F.date_add(F.lag('end_date').over(Window.partitionBy('id').orderBy('first_date')), 1)
    )
).withColumn(
    'seq_end', 
    F.when(
        F.row_number().over(Window.partitionBy('id').orderBy(F.desc('end_date'))) == 1, 
        F.lit(maxdate)
    ).otherwise(F.col('end_date'))
).withColumn(
    "date", 
    F.explode(F.expr("sequence(date(seq_begin), date(seq_end), interval 1 day)"))
)

result = datedf.join(df2, on=['date','id'], how='left').orderBy('date')

result.show(999)
+----------+---+----------+----------+----------+----+----------+----------+
|      date| id|first_date|  end_date|      var1|var2| seq_begin|   seq_end|
+----------+---+----------+----------+----------+----+----------+----------+
|2021-01-01|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-02|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-03|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-04|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-05|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-06|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-07|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-08|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-09|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-10|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-11|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-12|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-13|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-14|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-15|  1|2021-01-03|2021-01-15|2021-01-16|   2|2021-01-01|2021-01-15|
|2021-01-16|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-17|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-18|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-19|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-20|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-21|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-22|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-23|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-24|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-25|  1|2021-01-20|2021-01-25|2021-01-22|   1|2021-01-16|2021-01-25|
|2021-01-26|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-01-27|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-01-28|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-01-29|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-01-30|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-01-31|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-01|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-02|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-03|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-04|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-05|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-06|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-07|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-08|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-09|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-10|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-11|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-12|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-13|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-14|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-15|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-16|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-17|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-18|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-19|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-20|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-21|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-22|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-23|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-24|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-25|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-26|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-27|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
|2021-02-28|  1|2021-02-06|2021-02-10|      null|   2|2021-01-26|2021-02-28|
+----------+---+----------+----------+----------+----+----------+----------+
User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement