python - Why do we use pyspark UDF when python functions are faster than them? (Note. Not worrying about spark SQL commands)
问题描述
I have a dataframe:
df = (spark
.range(0, 10 * 1000 * 1000)\
.withColumn('id', (col('id') / 1000).cast('integer'))\
.withColumn('v', rand()))
Output:
+---+-------------------+
| id| v|
+---+-------------------+
| 0|0.05011803459635367|
| 0| 0.6749337782428327|
| 0| 0.9449105904567048|
| 0| 0.9183605955607251|
| 0| 0.648596393346793|
+---+-------------------+
Now, a simple - Add 1 to 'v' can be done via SQL functions and UDF.
If we ignore the SQL (best performant)
We can create a UDF as:
@udf("double")
def plus_one(v):
return v + 1
and call it:
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
Time: 16.5sec
But here is my question:
if I DO NOT use udf and directly write:
def plus_one(v):
return v + 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
Time Taken - 352ms
In a nutshell, the UDF query took ~ 16secs whereas a normal python function took ~ 350ms
To compare,
df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()
Time: 347ms
Here is my dilemma:
If I can perform the same scenario with a normal python function that performs comparatively to built-in functions...
Q. Why don't we use a python function directly?
Q. Does registering UDF only matter if we plan to use it inside a SQL like a command?
There must be some optimization reason why we don't do it...or maybe something related to how spark cluster works?
[ There are 2 questions already answered, but both of these ends with "SQL built-in functions are preferred..." I'm comparing a python function with UDF and it's feasibility in pyspark application. ]
Edit: I have done this with pandas_udf too:
@pandas_udf('double')
def vectorized_plus_one(v):
return v + 1
df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()
Time: 5.26 secs
I've attached a screenshot:
The output for Adding 1 to value - Python funtion (standalone), UDF, SQL
解决方案
Your scenario works because actually you don't add 1 in python, it's added in Java in a way very similar to one used when you do it with SQL.
Let's split the case apart:
- You do
plus_one(df.v)
which is equal to just passingdf.v + 1
- Try to type
df.v + 1
in your favorite repl and you'll see that it returns object of typeColumn
. - How can it be?
Column
class has__radd__
magic method overwritten(along with some others) and returns newColumn
instance with the instruction to add 1 to the specified column.
In summary: withColumn
always accepts objects of type Column
as the second argument and trick with adding 1 to your column is the magic of python.
That's why it works faster than udf
and vectorized udf
: they need to run python process, serialize/deserialize data(vectorized udfs can work faster with arrow
to avoid serializing/deserializing), compute in slower python process.
推荐阅读
- html - 使用css将元素移出div
- c++ - 匿名对象的属性
- python - python程序中找到2位数快乐数字的问题(卡在循环中)
- android - 数据更改时事件侦听器不会触发 firebase android
- c - For循环的索引作为Pthread中的参数
- facebook - 在 Flask 中安排 Facebook 聊天机器人在一天中的某个时间讲话
- firebase - 如何获取firebase本地身份验证状态持久性值
- javascript - 函数内部的 JavaScript 数学不起作用
- sql - 如何制作一条只带来与提供的两个参数匹配的记录的 SQL 语句?
- angular - angular 5 i18n - 能够在模板外使用翻译字符串