I don’t know how to interpolate in PySpark when the DataFrame contains many columns. Let me xplain.
JavaScript
x
15
15
1
from pyspark.sql.functions import to_timestamp
2
3
df = spark.createDataFrame([
4
("John", "A", "2018-02-01 03:00:00", 60),
5
("John", "A", "2018-02-01 03:03:00", 66),
6
("John", "A", "2018-02-01 03:05:00", 70),
7
("John", "A", "2018-02-01 03:08:00", 76),
8
("Mo", "A", "2017-06-04 01:05:00", 10),
9
("Mo", "A", "2017-06-04 01:07:00", 20),
10
("Mo", "B", "2017-06-04 01:10:00", 35),
11
("Mo", "B", "2017-06-04 01:11:00", 40),
12
], ("webID", "aType", "timestamp", "counts")).withColumn(
13
"timestamp", to_timestamp("timestamp")
14
)
15
I need to group by webID
and interpolate counts
values at 1 minute interval. However, when I apply the below-shown code,
JavaScript
1
20
20
1
from operator import attrgetter
2
from pyspark.sql.types import StructType
3
from pyspark.sql.functions import pandas_udf, PandasUDFType
4
5
def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
6
@pandas_udf(
7
StructType(sorted(schema, key=attrgetter("name"))),
8
PandasUDFType.GROUPED_MAP)
9
def _(pdf):
10
pdf.set_index(timestamp_col, inplace=True)
11
pdf = pdf.resample(freq).interpolate()
12
pdf.ffill(inplace=True)
13
pdf.reset_index(drop=False, inplace=True)
14
pdf.sort_index(axis=1, inplace=True)
15
return pdf
16
return _
17
18
19
df.groupBy("webID").apply(resample(df.schema, "60S")).show()
20
Error:
JavaScript
1
4
1
py4j.protocol.Py4JJavaError: An error occurred while calling o371.showString.
2
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 77 in stage 31.0 failed 4 times, most recent failure: Lost task 77.3 in stage 31.0 (TID 812, 27faa516aadb4c40b7d7586d7493143c0021c825663, executor 2): java.lang.IllegalArgumentException
3
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
4
Advertisement
Answer
Set the environment variable ARROW_PRE_0_15_IPC_FORMAT=1
.
JavaScript
1
15
15
1
def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
2
@pandas_udf(
3
StructType(sorted(schema, key=attrgetter("name"))),
4
PandasUDFType.GROUPED_MAP)
5
def _(pdf):
6
import os # add this line
7
os.environ['ARROW_PRE_0_15_IPC_FORMAT']='1' # add this line
8
pdf.set_index(timestamp_col, inplace=True)
9
pdf = pdf.resample(freq).interpolate()
10
pdf.ffill(inplace=True)
11
pdf.reset_index(drop=False, inplace=True)
12
pdf.sort_index(axis=1, inplace=True)
13
return pdf
14
return _
15