Skip to content
Advertisement

Why do we use pyspark UDF when python functions are faster than them? (Note. Not worrying about spark SQL commands)

I have a dataframe:

df = (spark
  .range(0, 10 * 1000 * 1000)
  .withColumn('id', (col('id') / 1000).cast('integer'))
  .withColumn('v', rand()))

Output:

+---+-------------------+
| id|                  v|
+---+-------------------+
|  0|0.05011803459635367|
|  0| 0.6749337782428327|
|  0| 0.9449105904567048|
|  0| 0.9183605955607251|
|  0|  0.648596393346793|
+---+-------------------+

Now, a simple – Add 1 to ‘v’ can be done via SQL functions and UDF.

If we ignore the SQL (best performant)

We can create a UDF as:

@udf("double")
def plus_one(v):
    return v + 1

and call it:

df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

Time: 16.5sec

But here is my question:

if I DO NOT use udf and directly write:

def plus_one(v):
        return v + 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

Time Taken – 352ms

In a nutshell, the UDF query took ~ 16secs whereas a normal python function took ~ 350ms

To compare,

df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()

Time: 347ms

Here is my dilemma:

If I can perform the same scenario with a normal python function that performs comparatively to built-in functions…

Q. Why don’t we use a python function directly?

Q. Does registering UDF only matter if we plan to use it inside a SQL like a command?

There must be some optimization reason why we don’t do it…or maybe something related to how spark cluster works?

[ There are 2 questions already answered, but both of these ends with “SQL built-in functions are preferred…” I’m comparing a python function with UDF and it’s feasibility in pyspark application. ]

Edit: I have done this with pandas_udf too:

@pandas_udf('double')
def vectorized_plus_one(v):
    return v + 1

df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()

Time: 5.26 secs

I’ve attached a screenshot:

The output for Adding 1 to value – Python funtion (standalone), UDF, SQL

Advertisement

Answer

Your scenario works because actually you don’t add 1 in python, it’s added in Java in a way very similar to one used when you do it with SQL.

Let’s split the case apart:

  1. You do plus_one(df.v) which is equal to just passing df.v + 1
  2. Try to type df.v + 1 in your favorite repl and you’ll see that it returns object of type Column.
  3. How can it be? Column class has __radd__ magic method overwritten(along with some others) and returns new Column instance with the instruction to add 1 to the specified column.

In summary: withColumn always accepts objects of type Column as the second argument and trick with adding 1 to your column is the magic of python.

That’s why it works faster than udf and vectorized udf: they need to run python process, serialize/deserialize data(vectorized udfs can work faster with arrow to avoid serializing/deserializing), compute in slower python process.

User contributions licensed under: CC BY-SA
3 People found this is helpful
Advertisement