Given a Spark dataframe with the following columns I am trying to construct an incremental/running count for each id
based on when the contents of the event
column evaluate to True
.
spark.createDataFrame([ ('2018-01-01', 'A', False), ('2018-01-02', 'A', False), ('2018-01-03', 'A', True), ('2018-01-04', 'A', False), ('2018-01-05', 'A', True), ('2018-01-06', 'A', False), ('2018-01-01', 'B', False), ('2018-01-02', 'B', True), ('2018-01-03', 'B', False), ], ['date', 'id', 'event']).show() +----------+---+-----+ | date| id|event| +----------+---+-----+ |2018-01-01| A|false| |2018-01-02| A|false| |2018-01-03| A| true| |2018-01-04| A|false| |2018-01-05| A| true| |2018-01-06| A|false| |2018-01-01| B|false| |2018-01-02| B| true| |2018-01-03| B|false| +----------+---+-----+
Here a new column called results
would be created that contained the incremental count.
+----------+---+-----+------+ | date| id|event|result| +----------+---+-----+------+ |2018-01-01| A|false| 0| |2018-01-02| A|false| 0| |2018-01-03| A| true| 1| |2018-01-04| A|false| 1| |2018-01-05| A| true| 2| |2018-01-06| A|false| 2| |2018-01-01| B|false| 0| |2018-01-02| B| true| 1| |2018-01-03| B|false| 1| +----------+---+-----+------+
I’ve tried using window functions but am stumped at this point. Ideally, the solution would increment the count row-wise without and group by or aggregation functions.
Thanks in advance.
Advertisement
Answer
You can use sum
function, casting your event
as an int:
import pyspark.sql.functions as f from pyspark.sql import Window df = df.withColumn('result', f.sum(f.col('event').cast('int')).over((Window.partitionBy('id').orderBy('date')))) df.sort('id', 'date').show() # +----------+---+-----+------+ # | date| id|event|result| # +----------+---+-----+------+ # |2018-01-01| A|false| 0| # |2018-01-02| A|false| 0| # |2018-01-03| A| true| 1| # |2018-01-04| A|false| 1| # |2018-01-05| A| true| 2| # |2018-01-06| A|false| 2| # |2018-01-01| B|false| 0| # |2018-01-02| B| true| 1| # |2018-01-03| B|false| 1| # +----------+---+-----+------+