This is an extension to an earlier question I raised here How to calculate difference between dates excluding weekends in PySpark 2.2.0. My spark dataframe looks like below and can be generated with the accompanying code:
df = spark.createDataFrame([(1, "John Doe", "2020-11-30",1),(2, "John Doe", "2020-11-27",2),(4, "John Doe", "2020-12-01",0),(5, "John Doe", "2020-10-02",1), (6, "John Doe", "2020-12-03",1),(7, "John Doe", "2020-12-04",1)], ("id", "name", "date","count")) +---+--------+----------+-----+ | id| name| date|count| +---+--------+----------+-----+ | 5|John Doe|2020-10-02| 1| | 2|John Doe|2020-11-27| 2| | 1|John Doe|2020-11-30| 1| | 4|John Doe|2020-12-01| 0| | 6|John Doe|2020-12-03| 1| | 7|John Doe|2020-12-04| 1| +---+--------+----------+-----+
I am trying to calculate cumulative sums over a period of 2,3,4,5 & 30 days. Below is a sample code for 2 days and the resulting table.
from pyspark.sql.types import IntegerType from pyspark.sql.functions import udf days = lambda i: i * 86400 windowval_2 = Window.partitionBy("name").orderBy(F.col("date").cast("timestamp").cast("long")).rangeBetween(days(-1), days(0)) windowval_3 = Window.partitionBy("name").orderBy(F.col("date").cast("timestamp").cast("long")).rangeBetween(days(-2), days(0)) windowval_4 = Window.partitionBy("name").orderBy(F.col("date").cast("timestamp").cast("long")).rangeBetween(days(-3), days(0)) df = df.withColumn("cum_sum_2d_temp",F.sum("count").over(windowval_2)) +---+--------+----------+-----+---------------+ | id| name| date|count|cum_sum_2d_temp| +---+--------+----------+-----+---------------+ | 5|John Doe|2020-10-02| 1| 1| | 2|John Doe|2020-11-27| 2| 2| | 1|John Doe|2020-11-30| 1| 1| | 4|John Doe|2020-12-01| 0| 1| | 6|John Doe|2020-12-03| 1| 1| | 7|John Doe|2020-12-04| 1| 2| +---+--------+----------+-----+---------------+
What I am trying to do is when calculating the date range, the calculation excludes weekends i.e. in my table 2020-11-27 is a Friday and 2020-11-30 is Monday. The diff between them is 1 if we exclude Sat & Sun. I want the cumulative sum of 2020-11-27 and 2020-11-30 values in front of 2020-11-30 in the ‘cum_sum_2d_temp’ column which should be 3. I am looking to combine the solution to my earlier question to the date range.
Advertisement
Answer
Calculate the date_dif relative to the earliest date:
import numpy as np import pyspark.sql.functions as F from pyspark.sql.window import Window from pyspark.sql.types import IntegerType df = spark.createDataFrame([(1, "John Doe", "2020-11-30",1),(2, "John Doe", "2020-11-27",2),(4, "John Doe", "2020-12-01",0),(5, "John Doe", "2020-10-02",1), (6, "John Doe", "2020-12-03",1),(7, "John Doe", "2020-12-04",1)], ("id", "name", "date","count")) workdaysUDF = F.udf(lambda date1, date2: int(np.busday_count(date2, date1)) if (date1 is not None and date2 is not None) else None, IntegerType()) df = df.withColumn("date_dif", workdaysUDF(F.col('date'), F.first(F.col('date')).over(Window.partitionBy('name').orderBy('date')))) windowval = lambda days: Window.partitionBy('name').orderBy('date_dif').rangeBetween(-days, 0) df = df.withColumn("cum_sum",F.sum("count").over(windowval(2))) df.show() +---+--------+----------+-----+--------+-------+ | id| name| date|count|date_dif|cum_sum| +---+--------+----------+-----+--------+-------+ | 5|John Doe|2020-10-02| 1| 0| 1| | 2|John Doe|2020-11-27| 2| 40| 2| | 1|John Doe|2020-11-30| 1| 41| 3| | 4|John Doe|2020-12-01| 0| 42| 3| | 6|John Doe|2020-12-03| 1| 44| 1| | 7|John Doe|2020-12-04| 1| 45| 2| +---+--------+----------+-----+--------+-------+