apache-spark - 过滤包含 udf 转换表的数据帧时 Spark 失败
问题描述
我在 pyspark 中遇到问题(在 2.2.0 和 2.3.0 版本中),同时按先前使用 UDF 转换的列过滤数据帧。这是一个失败的示例最小代码。第一个 show() 有效,但第二个返回 a java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, int, true])
。
样品表:
+----+----+
|col1|col2|
+----+----+
| a| z|
| b| x|
| b| x|
| c| y|
+----+----+
示例代码:
df = spark.read.parquet("some_path")
func = udf(lambda x: str(x), StringType())
df = df.withColumn("col2", func("col2"))
df.show()
df = df.filter(f.col("col2").isin(["z", "x"]))
df.show()
堆栈跟踪
Py4JJavaError: An error occurred while calling o200.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, int, true])
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:255)
at org.apache.spark.sql.execution.python.PythonUDF.eval(PythonUDF.scala:27)
at org.apache.spark.sql.catalyst.expressions.In.eval(predicates.scala:216)
at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:38)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$10.apply(PartitioningAwareFileIndex.scala:180)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$10.apply(PartitioningAwareFileIndex.scala:179)
at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.prunePartitions(PartitioningAwareFileIndex.scala:179)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listFiles(PartitioningAwareFileIndex.scala:64)
at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$FileSourceScanExec$$selectedPartitions$lzycompute(DataSourceScanExec.scala:189)
at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$FileSourceScanExec$$selectedPartitions(DataSourceScanExec.scala:186)
at org.apache.spark.sql.execution.FileSourceScanExec$$anonfun$22.apply(DataSourceScanExec.scala:288)
at org.apache.spark.sql.execution.FileSourceScanExec$$anonfun$22.apply(DataSourceScanExec.scala:287)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.execution.FileSourceScanExec.metadata$lzycompute(DataSourceScanExec.scala:287)
at org.apache.spark.sql.execution.FileSourceScanExec.metadata(DataSourceScanExec.scala:273)
at org.apache.spark.sql.execution.DataSourceScanExec$class.simpleString(DataSourceScanExec.scala:54)
at org.apache.spark.sql.execution.FileSourceScanExec.simpleString(DataSourceScanExec.scala:158)
at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:178)
at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$DataSourceScanExec$$super$verboseString(DataSourceScanExec.scala:158)
at org.apache.spark.sql.execution.DataSourceScanExec$class.verboseString(DataSourceScanExec.scala:62)
at org.apache.spark.sql.execution.FileSourceScanExec.verboseString(DataSourceScanExec.scala:158)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:556)
at org.apache.spark.sql.execution.WholeStageCodegenExec.generateTreeString(WholeStageCodegenExec.scala:670)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.execution.InputAdapter.generateTreeString(WholeStageCodegenExec.scala:396)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.execution.WholeStageCodegenExec.generateTreeString(WholeStageCodegenExec.scala:670)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:480)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:208)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:208)
at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:208)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
编辑:这似乎因镶木地板表中的火花数据框而失败。如果使用仅使用spark.createDataFrame
它创建的数据框测试此代码,则该代码可以正常工作。代码:
df = spark.createDataFrame(
[('a', 'z'),
('b', 'x'),
('b', 'x'),
('c', 'y')],
['col1', 'col2'])
func = udf(lambda x: str(x), StringType())
df = df.withColumn("col2", func("col2"))
df.show()
df = df.filter(f.col("col2").isin(["x", "z"]))
df.show()
解决方案
推荐阅读
- php - Symfony 3.4 LiipThemeBundle 为手机加载桌面模板
- python - 重塑 pandas 数据框,计算链接到 2 列
- ios - 故事板自动布局将视图放置在表格单元格中,并将尾随到超级视图约束固定在最左侧
- oauth-2.0 - OIDC - 获取一个身份令牌以供运行计划作业的后端(非实际用户)服务使用
- unit-testing - 使用 throwsA 进行 Dart 单元测试
- android - 为什么使用 STATE_RINGING 而不是 CALL_OFFHOOK
- python - Python 无法从 Windows 容器中访问 Internet
- php - php分页数据计数显示在每一页
- mysql - 如何在满足另一个条件时创建列的 AVG
- excel - 在列中最后一个使用的单元格和列之间有空白单元格之后,如何在 2 个单元格处求和?