Skip to content
Advertisement

pyspark, Compare two rows in dataframe

I’m attempting to compare one row in a dataframe with the next to see the difference in timestamp. Currently the data looks like:

 itemid | eventid | timestamp
 ----------------------------
 134    | 30      | 2016-07-02 12:01:40
 134    | 32      | 2016-07-02 12:21:23
 125    | 30      | 2016-07-02 13:22:56
 125    | 32      | 2016-07-02 13:27:07

I’ve tried mapping a function onto the dataframe to allow for comparing like this: (note: I’m trying to get rows with a difference greater than 4 hours)

items = df.limit(10)
          .orderBy('itemid', desc('stamp'))
          .map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()

But I’m getting the following error:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe

Which I believe is due to my using the map function incorrectly. Help with using map, or a different solution would be appreciated.

UPDATE: @zero323’s answer was informative on my improper use of mapping, however the system I’m using is running a Spark version before 2.02 and I’m working with data in Cassandra.

I managed to solve it with mapPartitions. See my answer below.

UPDATE(2017/03/27): Since originally marking the answer on this post my understanding of Spark has improved significantly. I’ve updated my answer below to show my current solution.

Advertisement

Answer

The comment by @ShuaiYuan on the original answer is correct. Over the last year I’ve developed a much better understanding of how Spark works and have actually rewritten the program I was working on for this post.

NEW ANSWER (2017/03/27)
To accomplish comparing the two rows of the dataframe I ended up using an RDD. I group the data by key (in this case the item id) and ignore eventid as it’s irrelevant in this equation. I then map a lambda function onto the rows, returning a tuple of the key and a list of tuples containing the start and end of event gaps, which is derived from “findGaps” function that iterates over the list of values (sorted timestamps) linked to each key. Once this is complete I filter out keys with no time gaps and then flatMapValues to return the data to a more sql like format. This is done with the following code:

# Find time gaps in list of datetimes where firings are longer than given duration.  
def findGaps(dates, duration):
    result = []
    length = len(dates)

    # convert to dates for comparison
    first = toDate(dates[0])
    last = toDate(dates[length - 1])
    for index, item in enumerate(dates):
        if index < length -1 and (dates[index + 1] - item).total_seconds() > duration:
            # build outage tuple and append to list
            # format (start, stop, duration)
            result.append(formatResult(item, dates[index + 1], kind))
    return result

outage_list = outage_join_df.rdd
                            .groupByKey()
                            .map(lambda row: (
                                     row[0],
                                     findGaps(
                                         sorted(list(row[1])), 
                                         limit
                                     )
                                  )
                            )
                            .filter(lambda row: len(row[1]) > 0)
                            .flatMapValues(lambda row: row)
                            .map(lambda row: (
                                 row[0]['itemid'],     # itemid
                                 row[1][0].date(),     # date
                                 row[1][0],            # start
                                 row[1][1],            # stop
                                 row[1][2]             # duration
                            ))
                            .collect()

ORIGINAL ANSWER (WRONG)
I managed to solve it using mapPartitions:

def findOutage(items):
    outages = []

    lastStamp = None
    for item in items:
        if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
            outages.append({"item": item.itemid, 
                            "start": item.stamp.isoformat(),
                            "stop": lastStamp.isoformat()})
        lastStamp = item.stamp
    return iter(outages)

items = df.limit(10).orderBy('itemid', desc('stamp'))

outages = items.mapPartitions(findOutage).collect()

Thanks everyone for the help!

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement