spark-streaming - 增量表合并操作期间的 java.lang.NullPointerException
问题描述
我有一个流管道,它使用带有Trigger.Once()的文件源数据源,每两个小时运行一次。我将此流与foreachBatch方法一起使用,以使用合并操作更新增量表。值得一提的是,我使用 1 个集群和 8 个工作人员更新了 +100 个增量表,但同时不超过 24 个(编排机制)。有时我在执行合并操作时会看到以下错误:
FileStreamSource[s3://bucket/path]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:379)
at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:269)
Caused by: java.lang.NullPointerException
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.$anonfun$improveUnsupportedOpError$2(AnalysisHelper.scala:60)
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.$anonfun$improveUnsupportedOpError$2$adapted(AnalysisHelper.scala:60)
at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95)
at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92)
at scala.collection.immutable.List.exists(List.scala:89)
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.isExtensionOrCatalogError$1(AnalysisHelper.scala:60)
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:64)
at com.databricks.sql.transaction.tahoe.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:51)
at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:126)
at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:239)
合并操作失败后,会执行重试(编排机制),现在可以顺利完成。
我还检测到(这是我真正的问题)存档过程没有按预期工作,可能是因为合并操作中的这些失败。每周/每月每个表都有几个文件永久保存在输入路径中,并且永远不会移动到存档路径中。这将迫使我在不久的将来以不同的方式将这些“被遗忘”的输入文件移动到存档路径,而这正是我想要尽可能避免的。
根据文档的存档过程是尽力而为,因此,当应用程序没有正常关闭(可能是合并操作失败)或排队清理的文件过多(我没有那么多每个微批次中的输入文件)。我正在使用 Databricks Runtime 7.3.x-scala2.12,它带有 delta Lake 0.7.0,这些是我拥有的配置:
"spark.sql.sources.commitProtocolClass": "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol",
"spark.databricks.delta.schema.autoMerge.enabled": "true",
"parquet.enable.summary-metadata": "false",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true",
"spark.sql.parquet.mergeSchema": "true",
"spark.sql.shuffle.partitions": "32",
"spark.databricks.delta.autoCompact.enabled": "true",
"spark.sql.streaming.schemaInference": "true",
"spark.databricks.delta.optimizeWrite.enabled": "true"
我将不胜感激这方面的任何帮助。
解决方案
推荐阅读
- javascript - 在 forEach 循环运行后通过 express 发送 sequelize 结果
- python - 通过 python 使用 selenium 和 webdriver 执行测试对网络有任何影响吗?
- angular - 如何在剑道网格中重用 kendoGridHeaderTemplate
- jenkins - 如何将秘密文本文件转储到控制台?
- mongodb - 是否可以在 MongoDB 中查询哪些 LineStrings 靠近另一个 LineString?
- websocket - Python Autobahn Websocket 服务器一次接收一个连接
- java - 方法执行多次而不是一次
- ruby-on-rails - 从多个哈希数组构建一个新的哈希数组
- pyomo - 可选输入数据
- annotations - 无法使用@EJB 注解注入 EJB