首页 > 解决方案 > 过滤包含 udf 转换表的数据帧时 Spark 失败

问题描述

我在 pyspark 中遇到问题(在 2.2.0 和 2.3.0 版本中),同时按先前使用 UDF 转换的列过滤数据帧。这是一个失败的示例最小代码。第一个 show() 有效,但第二个返回 a java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, int, true])

样品表:

+----+----+
|col1|col2|
+----+----+
|   a|   z|
|   b|   x|
|   b|   x|
|   c|   y|
+----+----+

示例代码:

df = spark.read.parquet("some_path")
func = udf(lambda x: str(x), StringType())
df = df.withColumn("col2", func("col2"))
df.show()
df = df.filter(f.col("col2").isin(["z", "x"]))
df.show()

堆栈跟踪

Py4JJavaError: An error occurred while calling o200.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, int, true])
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:255)
    at org.apache.spark.sql.execution.python.PythonUDF.eval(PythonUDF.scala:27)
    at org.apache.spark.sql.catalyst.expressions.In.eval(predicates.scala:216)
    at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:38)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$10.apply(PartitioningAwareFileIndex.scala:180)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$10.apply(PartitioningAwareFileIndex.scala:179)
    at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.prunePartitions(PartitioningAwareFileIndex.scala:179)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listFiles(PartitioningAwareFileIndex.scala:64)
    at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$FileSourceScanExec$$selectedPartitions$lzycompute(DataSourceScanExec.scala:189)
    at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$FileSourceScanExec$$selectedPartitions(DataSourceScanExec.scala:186)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anonfun$22.apply(DataSourceScanExec.scala:288)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anonfun$22.apply(DataSourceScanExec.scala:287)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.sql.execution.FileSourceScanExec.metadata$lzycompute(DataSourceScanExec.scala:287)
    at org.apache.spark.sql.execution.FileSourceScanExec.metadata(DataSourceScanExec.scala:273)
    at org.apache.spark.sql.execution.DataSourceScanExec$class.simpleString(DataSourceScanExec.scala:54)
    at org.apache.spark.sql.execution.FileSourceScanExec.simpleString(DataSourceScanExec.scala:158)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:178)
    at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$DataSourceScanExec$$super$verboseString(DataSourceScanExec.scala:158)
    at org.apache.spark.sql.execution.DataSourceScanExec$class.verboseString(DataSourceScanExec.scala:62)
    at org.apache.spark.sql.execution.FileSourceScanExec.verboseString(DataSourceScanExec.scala:158)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:556)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.generateTreeString(WholeStageCodegenExec.scala:670)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
    at org.apache.spark.sql.execution.InputAdapter.generateTreeString(WholeStageCodegenExec.scala:396)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.generateTreeString(WholeStageCodegenExec.scala:670)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
    at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:480)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:208)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:208)
    at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:208)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

编辑:这似乎因镶木地板表中的火花数据框而失败。如果使用仅使用spark.createDataFrame它创建的数据框测试此代码,则该代码可以正常工作。代码:

df = spark.createDataFrame(
        [('a', 'z'),
         ('b', 'x'),
         ('b', 'x'),
         ('c', 'y')],
        ['col1', 'col2'])

func = udf(lambda x: str(x), StringType())
df = df.withColumn("col2", func("col2"))
df.show()
df = df.filter(f.col("col2").isin(["x", "z"]))
df.show()

标签: apache-sparkpysparkapache-spark-sqluser-defined-functions

解决方案


推荐阅读