I have a dataframe:
df = (spark .range(0, 10 * 1000 * 1000) .withColumn('id', (col('id') / 1000).cast('integer')) .withColumn('v', rand()))
Output:
+---+-------------------+ | id| v| +---+-------------------+ | 0|0.05011803459635367| | 0| 0.6749337782428327| | 0| 0.9449105904567048| | 0| 0.9183605955607251| | 0| 0.648596393346793| +---+-------------------+
Now, a simple – Add 1 to ‘v’ can be done via SQL functions and UDF.
If we ignore the SQL (best performant)
We can create a UDF as:
@udf("double") def plus_one(v): return v + 1
and call it:
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
Time: 16.5sec
But here is my question:
if I DO NOT use udf and directly write:
def plus_one(v): return v + 1 df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
Time Taken – 352ms
In a nutshell, the UDF query took ~ 16secs whereas a normal python function took ~ 350ms
To compare,
df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()
Time: 347ms
Here is my dilemma:
If I can perform the same scenario with a normal python function that performs comparatively to built-in functions…
Q. Why don’t we use a python function directly?
Q. Does registering UDF only matter if we plan to use it inside a SQL like a command?
There must be some optimization reason why we don’t do it…or maybe something related to how spark cluster works?
[ There are 2 questions already answered, but both of these ends with “SQL built-in functions are preferred…” I’m comparing a python function with UDF and it’s feasibility in pyspark application. ]
Edit: I have done this with pandas_udf too:
@pandas_udf('double') def vectorized_plus_one(v): return v + 1 df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()
Time: 5.26 secs
I’ve attached a screenshot:
The output for Adding 1 to value – Python funtion (standalone), UDF, SQL
Advertisement
Answer
Your scenario works because actually you don’t add 1 in python, it’s added in Java in a way very similar to one used when you do it with SQL.
Let’s split the case apart:
- You do
plus_one(df.v)
which is equal to just passingdf.v + 1
- Try to type
df.v + 1
in your favorite repl and you’ll see that it returns object of typeColumn
. - How can it be?
Column
class has__radd__
magic method overwritten(along with some others) and returns newColumn
instance with the instruction to add 1 to the specified column.
In summary: withColumn
always accepts objects of type Column
as the second argument and trick with adding 1 to your column is the magic of python.
That’s why it works faster than udf
and vectorized udf
: they need to run python process, serialize/deserialize data(vectorized udfs can work faster with arrow
to avoid serializing/deserializing), compute in slower python process.