I am a Noob in Python & Pyspark. I need to explode a row of patient into yearly dates, such that each patient has 1 row per year.
I wrote a python function (below), and registered it as pyspark UDF (having read many articles here). My problem is that when I apply it on my pyspark dataframe, it fails. My function returns 4 values, but I could not figure out how to use/apply using .withColumn or other method/function using this UDF.
While registering the python function as UDF, I could not understand what data type to be returned. I am sure i messed up somewhere, but dont know where.
Please help
import sys import pyspark.sql.functions as f import pyspark.sql as t from pyspark.sql.types import * import datetime import dateutil def process_data(identifier, eff_date, end_date): eff_year = int(eff_date.split("/")[2]) end_year = int(end_date.split("/")[2]) current_year = eff_year while current_year <= end_year: if str(current_year) in eff_date: first_date_in_year = eff_date else: first_date_in_year = "%02d/%02d/%4d" % (1, 1, current_year) if str(current_year) in end_date: last_date_in_year = end_date else: last_date_in_year = "%02d/%02d/%4d" % (12, 31, current_year) print ("|".join([identifier, first_date_in_year, last_date_in_year])) current_year += 1 return identifier, first_date_in_year, last_date_of_year, current_year def main(): with open('date.input') as f: for line in f.read().splitlines(): identifier, eff_date, end_date = line.split('|') process_data(identifier, eff_date, end_date) process_data_UDF=f.udf(process_data, StringType()) df_explode=df.withColumn("NEWCOLS", process_data_UDF("INDV_ID", "EFF_DATE", "END_DATE")) df_explode.show()
Input Data
Identifier | Eff_Date | End_Date |
---|---|---|
A0001 | 2/1/2019 | 3/31/2021 |
B0001 | 6/1/2020 | 11/30/2020 |
C0001 | 6/1/2020 | 1/31/2021 |
D0001 | 6/1/2020 | 12/31/2021 |
E0001 | 1/1/2019 | 6/30/2019 |
E0001 | 1/1/2020 | 12/31/2020 |
Output Data
Identifier | New_Eff_Date | New_End_Date |
---|---|---|
A0001 | 2/1/2019 | 12/31/2019 |
A0001 | 01/01/2020 | 12/31/2020 |
A0001 | 01/01/2021 | 3/31/2021 |
B0001 | 6/1/2020 | 11/30/2020 |
C0001 | 6/1/2020 | 12/31/2020 |
C0001 | 01/01/2021 | 1/31/2021 |
D0001 | 6/1/2020 | 12/31/2020 |
D0001 | 01/01/2021 | 12/31/2021 |
E0001 | 1/1/2019 | 6/30/2019 |
E0001 | 1/1/2020 | 12/31/2020 |
Advertisement
Answer
My input dataframe, based on your data :
from datetime import date b = "Identifier Eff_Date End_Date".split() a = [ ["A0001", date(2019, 2, 1), date(2021, 3, 31)], ] df = spark.createDataFrame(a, b) df.printSchema() root |-- Identifier: string (nullable = true) |-- Eff_Date: date (nullable = true) |-- End_Date: date (nullable = true) df.show() +----------+----------+----------+ |Identifier| Eff_Date| End_Date| +----------+----------+----------+ | A0001|2019-02-01|2021-03-31| +----------+----------+----------+
Here is the function you need :
def generate_dates(start, end): out = [] for i in range((end.year - start.year)+1): s = date(start.year+i, 1,1) e = date(start.year+i, 12,31) out.append((max(s, start), min(e, end))) return out
let’s just transform that in UDF, use it in dataframe with explode
and you should have your expected result :
from pyspark.sql import functions as F, types as T schema = T.ArrayType( T.StructType( [ T.StructField("New_Eff_Date", T.DateType()), T.StructField("New_End_Date", T.DateType()), ] ) ) generate_dates_udf = F.udf(generate_dates, schema) df.withColumn( "new_dates", generate_dates_udf(F.col("Eff_Date"), F.col("End_Date")) ).withColumn("new_date", F.explode("new_dates")).select( "Identifier", "new_date.New_Eff_Date", "new_date.New_End_Date", ).show() +----------+------------+------------+ |Identifier|New_Eff_Date|New_End_Date| +----------+------------+------------+ | A0001| 2019-02-01| 2019-12-31| | A0001| 2020-01-01| 2020-12-31| | A0001| 2021-01-01| 2021-03-31| +----------+------------+------------+
Same version but full spark :
df = ( df.withColumn("year", F.explode(F.sequence(F.year("Eff_Date"), F.year("End_Date")))) .withColumn( "s", F.concat_ws("-", F.col("year").cast("string"), F.lit("01"), F.lit("01")).cast( "date" ), ) .withColumn( "e", F.concat_ws("-", F.col("year").cast("string"), F.lit("12"), F.lit("31")).cast( "date" ), ) .select( "Identifier", F.greatest(F.col("Eff_Date"), F.col("s")).alias("New_Eff_Date"), F.least(F.col("End_Date"), F.col("e")).alias("New_End_Date"), ) ) df.show() +----------+------------+------------+ |Identifier|New_Eff_Date|New_End_Date| +----------+------------+------------+ | A0001| 2019-02-01| 2019-12-31| | A0001| 2020-01-01| 2020-12-31| | A0001| 2021-01-01| 2021-03-31| +----------+------------+------------+