首页 > 解决方案 > 增量表合并操作期间的 java.lang.NullPointerException

问题描述

我有一个流管道,它使用带有Trigger.Once()的文件源数据源,每两个小时运行一次。我将此流与foreachBatch方法一起使用,以使用合并操作更新增量表。值得一提的是,我使用 1 个集群和 8 个工作人员更新了 +100 个增量表,但同时不超过 24 个(编排机制)。有时我在执行合并操作时会看到以下错误:

FileStreamSource[s3://bucket/path]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:379)
at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:269)
Caused by: java.lang.NullPointerException
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.$anonfun$improveUnsupportedOpError$2(AnalysisHelper.scala:60)
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.$anonfun$improveUnsupportedOpError$2$adapted(AnalysisHelper.scala:60)
at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95)
at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92)
at scala.collection.immutable.List.exists(List.scala:89)
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.isExtensionOrCatalogError$1(AnalysisHelper.scala:60)
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:64)
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:51)
at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:126)
at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:239)

​合并操作失败后,会执行重试(编排机制),现在可以顺利完成。

我还检测到(这是我真正的问题)存档过程没有按预期工作,可能是因为合并操作中的这些失败。每周/每月每个表都有几个文件永久保存在输入路径中,并且永远不会移动到存档路径中。这将迫使我在不久的将来以不同的方式将这些“被遗忘”的输入文件移动到存档路径,而这正是我想要尽可能避免的。

根据文档的存档过程是尽力而为,因此,当应用程序没有正常关闭(可能是合并操作失败)或排队清理的文件过多(我没有那么多每个微批次中的输入文件)。我正在使用 Databricks Runtime 7.3.x-scala2.12,它带有 delta Lake 0.7.0,这些是我拥有的配置:

"spark.sql.sources.commitProtocolClass": "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol",
"spark.databricks.delta.schema.autoMerge.enabled": "true",
"parquet.enable.summary-metadata": "false",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true",
"spark.sql.parquet.mergeSchema": "true",
"spark.sql.shuffle.partitions": "32",
"spark.databricks.delta.autoCompact.enabled": "true",
"spark.sql.streaming.schemaInference": "true",
"spark.databricks.delta.optimizeWrite.enabled": "true"

我将不胜感激这方面的任何帮助。

标签: spark-streamingdatabricksdelta-lake

解决方案


推荐阅读