Skip to content
Advertisement

PYSPARK UDF to explode records based on date range

I am a Noob in Python & Pyspark. I need to explode a row of patient into yearly dates, such that each patient has 1 row per year.

I wrote a python function (below), and registered it as pyspark UDF (having read many articles here). My problem is that when I apply it on my pyspark dataframe, it fails. My function returns 4 values, but I could not figure out how to use/apply using .withColumn or other method/function using this UDF.

While registering the python function as UDF, I could not understand what data type to be returned. I am sure i messed up somewhere, but dont know where.

Please help

import sys
import pyspark.sql.functions as f
import pyspark.sql as t
from pyspark.sql.types import *
import datetime
import dateutil

def process_data(identifier, eff_date, end_date):
    eff_year = int(eff_date.split("/")[2])
    end_year = int(end_date.split("/")[2])
    
    current_year = eff_year
    while current_year <= end_year:
        
        if str(current_year) in eff_date:
            first_date_in_year = eff_date
        else:
            first_date_in_year = "%02d/%02d/%4d" % (1, 1, current_year)
            
        if str(current_year) in end_date:
            last_date_in_year = end_date
        else:
            last_date_in_year = "%02d/%02d/%4d" % (12, 31, current_year)
            
        print ("|".join([identifier, first_date_in_year, last_date_in_year]))
        current_year += 1

        return identifier, first_date_in_year, last_date_of_year, current_year


def main():
    
    with open('date.input') as f:
        for line in f.read().splitlines():
            identifier, eff_date, end_date = line.split('|')
            process_data(identifier, eff_date, end_date)

process_data_UDF=f.udf(process_data, StringType())

df_explode=df.withColumn("NEWCOLS", process_data_UDF("INDV_ID", "EFF_DATE", "END_DATE"))
df_explode.show()

Input Data

Identifier Eff_Date End_Date
A0001 2/1/2019 3/31/2021
B0001 6/1/2020 11/30/2020
C0001 6/1/2020 1/31/2021
D0001 6/1/2020 12/31/2021
E0001 1/1/2019 6/30/2019
E0001 1/1/2020 12/31/2020

Output Data

Identifier New_Eff_Date New_End_Date
A0001 2/1/2019 12/31/2019
A0001 01/01/2020 12/31/2020
A0001 01/01/2021 3/31/2021
B0001 6/1/2020 11/30/2020
C0001 6/1/2020 12/31/2020
C0001 01/01/2021 1/31/2021
D0001 6/1/2020 12/31/2020
D0001 01/01/2021 12/31/2021
E0001 1/1/2019 6/30/2019
E0001 1/1/2020 12/31/2020

Advertisement

Answer

My input dataframe, based on your data :

from datetime import date

b = "Identifier Eff_Date    End_Date".split()

a = [
    ["A0001", date(2019, 2, 1), date(2021, 3, 31)],
]

df = spark.createDataFrame(a, b)

df.printSchema()
root
 |-- Identifier: string (nullable = true)
 |-- Eff_Date: date (nullable = true)
 |-- End_Date: date (nullable = true)

df.show()
+----------+----------+----------+                                              
|Identifier|  Eff_Date|  End_Date|
+----------+----------+----------+
|     A0001|2019-02-01|2021-03-31|
+----------+----------+----------+

Here is the function you need :

def generate_dates(start, end):
    out = []
    for i in range((end.year - start.year)+1):
        s = date(start.year+i, 1,1)
        e = date(start.year+i, 12,31)
        out.append((max(s, start), min(e, end)))
    return out

let’s just transform that in UDF, use it in dataframe with explode and you should have your expected result :

from pyspark.sql import functions as F, types as T


schema = T.ArrayType(
    T.StructType(
        [
            T.StructField("New_Eff_Date", T.DateType()),
            T.StructField("New_End_Date", T.DateType()),
        ]
    )
)

generate_dates_udf = F.udf(generate_dates, schema)


df.withColumn(
    "new_dates", generate_dates_udf(F.col("Eff_Date"), F.col("End_Date"))
).withColumn("new_date", F.explode("new_dates")).select(
    "Identifier",
    "new_date.New_Eff_Date",
    "new_date.New_End_Date",
).show()
+----------+------------+------------+
|Identifier|New_Eff_Date|New_End_Date|
+----------+------------+------------+
|     A0001|  2019-02-01|  2019-12-31|
|     A0001|  2020-01-01|  2020-12-31|
|     A0001|  2021-01-01|  2021-03-31|
+----------+------------+------------+

Same version but full spark :

df = (
    df.withColumn("year", F.explode(F.sequence(F.year("Eff_Date"), F.year("End_Date"))))
    .withColumn(
        "s",
        F.concat_ws("-", F.col("year").cast("string"), F.lit("01"), F.lit("01")).cast(
            "date"
        ),
    )
    .withColumn(
        "e",
        F.concat_ws("-", F.col("year").cast("string"), F.lit("12"), F.lit("31")).cast(
            "date"
        ),
    )
    .select(
        "Identifier",
        F.greatest(F.col("Eff_Date"), F.col("s")).alias("New_Eff_Date"),
        F.least(F.col("End_Date"), F.col("e")).alias("New_End_Date"),
    )
)

df.show()
+----------+------------+------------+                                          
|Identifier|New_Eff_Date|New_End_Date|
+----------+------------+------------+
|     A0001|  2019-02-01|  2019-12-31|
|     A0001|  2020-01-01|  2020-12-31|
|     A0001|  2021-01-01|  2021-03-31|
+----------+------------+------------+
User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement