首页 > 解决方案 > 如何在条件语句中停止 Spark 解析 UDF 列

问题描述

我想执行一些条件分支以避免计算不必要的节点,但我注意到如果条件语句中的源列是 UDF,那么无论如何都会解决否则:

 @pandas_udf("double", PandasUDFType.SCALAR)
 def udf_that_throws_exception(*cols):
   raise Exception('Error')

 @pandas_udf("int", PandasUDFType.SCALAR)
 def simple_mul_udf(*cols):
   result = cols[0]
   for c in cols[1:]:
     result *= c
   return result

 df = spark.range(0,5)

 df = df.withColumn('A', lit(1))
 df = df.withColumn('B', lit(2))

 df = df.withColumn('udf', simple_mul('A','B'))
 df = df.withColumn('sql', expr('A*B'))

 df = df.withColumn('res', when(df.sql < 100, lit(1)).otherwise(udf_that_throws(lit(0))))

上面的代码按预期工作,这种情况下的语句总是正确的,所以我的抛出异常的 UDF 永远不会被调用。

但是,如果我将条件更改为使用df.udf,那么突然之间会调用其他 UDF,即使条件结果没有改变,我也会得到异常。

我认为我可以通过从条件中删除 UDF 来混淆它,但是无论如何都会出现相同的结果:

  df = df.withColumn('cond', when(df.udf < 100, lit(1)).otherwise(lit(0)))
  df = df.withColumn('res', when(df.cond == lit(1), lit(1)).otherwise(udf_that_throws_exception(lit(0))))

我想这与 Spark 优化的方式有关,这很好,但我正在寻找任何方法来做到这一点而不会产生成本。有任何想法吗?

根据请求编辑以获取更多信息。我们正在编写一个可以接受任意模型的处理引擎,并且代码会生成图形。在此过程中,我们会根据运行时的值状态做出决策。我们大量使用 pandas UDF。因此,想象一下我们在图中有多个路径的情况,并且根据运行时的某些条件,我们希望遵循其中一个路径,而其他所有路径都保持不变。

我想将此逻辑编码到图中,因此我不必在代码中收集和分支。

我提供的示例代码仅用于演示目的。我面临的问题是,如果 IF 语句中使用的列是 UDF,或者似乎是从 UDF 派生的,那么即使它从未实际使用过,也会始终执行 OTHERWISE 条件。如果 IF/ELSE 是诸如文字之类的廉价操作,我不介意,但是如果列 UDF(可能在两侧)导致大聚合或实际上只是被丢弃的其他长度过程怎么办?

标签: apache-sparkpysparkapache-spark-sql

解决方案


在 PySpark 中,UDF 是预先计算的,因此您会得到这种次优的行为。您也可以从查询计划中看到它:

== Physical Plan ==
*(2) Project [id#753L, 1 AS A#755, 2 AS B#758, pythonUDF1#776 AS udf#763, CASE WHEN (pythonUDF1#776 < 100) THEN 1.0 ELSE pythonUDF2#777 END AS res#769]
+- ArrowEvalPython [simple_mul_udf(1, 2), simple_mul_udf(1, 2), udf_that_throws_exception(0)], [id#753L, pythonUDF0#775, pythonUDF1#776, pythonUDF2#777]
   +- *(1) Range (0, 5, step=1, splits=8)

ArrowEvalPython操作员负责计算 UDF,然后在操作员中评估条件Project

调用条件(最佳行为)时出现不同行为的原因df.sql是,这是一种特殊情况,其中此列中的值是恒定的(列AB都是恒定的),Spark 优化器可以事先对其进行评估(在查询计划处理期间的驱动程序中,在集群上执行实际作业之前),因此它知道otherwise永远不必评估条件的分支。如果此sql列中的值是动态的(例如在id列中),则行为将再次次优,因为 Spark 事先不知道该otherwise部分永远不会发生。

如果您想避免这种次优行为(otherwise即使不需要也调用 udf),一种可能的解决方案是在 udf 中评估此条件,例如如下:

@pandas_udf("int", PandasUDFType.SCALAR)
def udf_with_cond(*cols):
    result = cols[0]
    for c in cols[1:]:
        result *= c

    if((result < 100).any()):
        return result
    else:
        raise Exception('Error')

df = df.withColumn('res', udf_with_cond('A', 'B'))

推荐阅读