I am a Noob in Python & Pyspark. I need to explode a row of patient into yearly dates, such that each patient has 1 row per year.
I wrote a python function (below), and registered it as pyspark UDF (having read many articles here). My problem is that when I apply it on my pyspark dataframe, it fails. My function returns 4 values, but I could not figure out how to use/apply using .withColumn or other method/function using this UDF.
While registering the python function as UDF, I could not understand what data type to be returned. I am sure i messed up somewhere, but dont know where.
Please help
import sys
import pyspark.sql.functions as f
import pyspark.sql as t
from pyspark.sql.types import *
import datetime
import dateutil
def process_data(identifier, eff_date, end_date):
eff_year = int(eff_date.split("/")[2])
end_year = int(end_date.split("/")[2])
current_year = eff_year
while current_year <= end_year:
if str(current_year) in eff_date:
first_date_in_year = eff_date
else:
first_date_in_year = "%02d/%02d/%4d" % (1, 1, current_year)
if str(current_year) in end_date:
last_date_in_year = end_date
else:
last_date_in_year = "%02d/%02d/%4d" % (12, 31, current_year)
print ("|".join([identifier, first_date_in_year, last_date_in_year]))
current_year += 1
return identifier, first_date_in_year, last_date_of_year, current_year
def main():
with open('date.input') as f:
for line in f.read().splitlines():
identifier, eff_date, end_date = line.split('|')
process_data(identifier, eff_date, end_date)
process_data_UDF=f.udf(process_data, StringType())
df_explode=df.withColumn("NEWCOLS", process_data_UDF("INDV_ID", "EFF_DATE", "END_DATE"))
df_explode.show()
Input Data
| Identifier | Eff_Date | End_Date |
|---|---|---|
| A0001 | 2/1/2019 | 3/31/2021 |
| B0001 | 6/1/2020 | 11/30/2020 |
| C0001 | 6/1/2020 | 1/31/2021 |
| D0001 | 6/1/2020 | 12/31/2021 |
| E0001 | 1/1/2019 | 6/30/2019 |
| E0001 | 1/1/2020 | 12/31/2020 |
Output Data
| Identifier | New_Eff_Date | New_End_Date |
|---|---|---|
| A0001 | 2/1/2019 | 12/31/2019 |
| A0001 | 01/01/2020 | 12/31/2020 |
| A0001 | 01/01/2021 | 3/31/2021 |
| B0001 | 6/1/2020 | 11/30/2020 |
| C0001 | 6/1/2020 | 12/31/2020 |
| C0001 | 01/01/2021 | 1/31/2021 |
| D0001 | 6/1/2020 | 12/31/2020 |
| D0001 | 01/01/2021 | 12/31/2021 |
| E0001 | 1/1/2019 | 6/30/2019 |
| E0001 | 1/1/2020 | 12/31/2020 |
Advertisement
Answer
My input dataframe, based on your data :
from datetime import date
b = "Identifier Eff_Date End_Date".split()
a = [
["A0001", date(2019, 2, 1), date(2021, 3, 31)],
]
df = spark.createDataFrame(a, b)
df.printSchema()
root
|-- Identifier: string (nullable = true)
|-- Eff_Date: date (nullable = true)
|-- End_Date: date (nullable = true)
df.show()
+----------+----------+----------+
|Identifier| Eff_Date| End_Date|
+----------+----------+----------+
| A0001|2019-02-01|2021-03-31|
+----------+----------+----------+
Here is the function you need :
def generate_dates(start, end):
out = []
for i in range((end.year - start.year)+1):
s = date(start.year+i, 1,1)
e = date(start.year+i, 12,31)
out.append((max(s, start), min(e, end)))
return out
let’s just transform that in UDF, use it in dataframe with explode and you should have your expected result :
from pyspark.sql import functions as F, types as T
schema = T.ArrayType(
T.StructType(
[
T.StructField("New_Eff_Date", T.DateType()),
T.StructField("New_End_Date", T.DateType()),
]
)
)
generate_dates_udf = F.udf(generate_dates, schema)
df.withColumn(
"new_dates", generate_dates_udf(F.col("Eff_Date"), F.col("End_Date"))
).withColumn("new_date", F.explode("new_dates")).select(
"Identifier",
"new_date.New_Eff_Date",
"new_date.New_End_Date",
).show()
+----------+------------+------------+
|Identifier|New_Eff_Date|New_End_Date|
+----------+------------+------------+
| A0001| 2019-02-01| 2019-12-31|
| A0001| 2020-01-01| 2020-12-31|
| A0001| 2021-01-01| 2021-03-31|
+----------+------------+------------+
Same version but full spark :
df = (
df.withColumn("year", F.explode(F.sequence(F.year("Eff_Date"), F.year("End_Date"))))
.withColumn(
"s",
F.concat_ws("-", F.col("year").cast("string"), F.lit("01"), F.lit("01")).cast(
"date"
),
)
.withColumn(
"e",
F.concat_ws("-", F.col("year").cast("string"), F.lit("12"), F.lit("31")).cast(
"date"
),
)
.select(
"Identifier",
F.greatest(F.col("Eff_Date"), F.col("s")).alias("New_Eff_Date"),
F.least(F.col("End_Date"), F.col("e")).alias("New_End_Date"),
)
)
df.show()
+----------+------------+------------+
|Identifier|New_Eff_Date|New_End_Date|
+----------+------------+------------+
| A0001| 2019-02-01| 2019-12-31|
| A0001| 2020-01-01| 2020-12-31|
| A0001| 2021-01-01| 2021-03-31|
+----------+------------+------------+