I have a delta table which has thrift data from kafka and I am using a UDF to deserialize it. I have no issues when I use regular UDF, but I get an error when I try to use Pandas UDF.
This runs fine i.e. ruglar UDF
JavaScript
x
13
13
1
def decoder(thrift_data):
2
3
schema_file = thriftpy2.load("/dbfs/FileStore/schema_file.thrift")
4
schema = schema_file.SchemaClass()
5
decoded_payload = deserialize(schema, thrift_data, TCyBinaryProtocolFactory())
6
json_data = proto.struct_to_json(decoded_payload)
7
return json.dumps(json_data)
8
9
10
decoder_udf = udf(decoder, StringType())
11
data = spark.sql("""SELECT value FROM data_table""")
12
data = data.withColumn('decoded_json', decoder_udf(data.value))
13
But when I use Pandas UDF
JavaScript
1
13
13
1
def decoder(thrift_data: pd.Series) -> pd.Series:
2
3
schema_file = thriftpy2.load("/dbfs/FileStore/schema_file.thrift")
4
schema = schema_file.SchemaClass()
5
decoded_payload = deserialize(schema, thrift_data, TCyBinaryProtocolFactory())
6
json_data = proto.struct_to_json(decoded_payload)
7
return json.dumps(json_data)
8
9
10
decoder_udf = pandas_udf(decoder, returnType=StringType())
11
data = spark.sql("""SELECT value FROM data_table""")
12
data = data.withColumn('decoded_json', decoder_udf(data.value))
13
I get an error PythonException: 'RuntimeError: Result vector from pandas_udf was not the required length: expected 5000, got 651'.
Advertisement
Answer
Figured out the solution, we have to return the output as a series