scala - 为什么 Spark 结构化流式传输作业即使在引发异常后也不会终止
问题描述
我提出了一个自定义异常来测试我的结构化流作业中的失败,如下所示。我看到查询被终止但无法理解为什么驱动程序脚本没有因非零退出代码而失败
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => {
val transformedDF: DataFrame = DoSomeProcessing(batchDF)
if (batchId == 1) {
throw new Exception("Custom Exception as batchId is 1")
}
我在控制台上得到以下跟踪,但驱动程序脚本没有退出,并且控制台上没有打印新日志。
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Custom Exception as batchId is 1
=== Streaming Query ===
Identifier: [id = 6f4c3b4c-bc30-46fe-93ef-8378c23380ab, runId = 1241cb37-493b-4882-ab28-9df8a8c6fb1a]
Current Committed Offsets: ...
Current Available Offsets: ...
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
RepartitionByExpression [timestamp#12], 10
...
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.Exception: Custom Exception as batchId is 1
at MySteamingApp$$anonfun$startSparkStructuredStreaming$1.apply(MySteamingApp.scala:61)
at MySteamingApp$$anonfun$startSparkStructuredStreaming$1.apply(MySteamingApp.scala:57)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
解决方案
我认为任务失败的数量配置得更多
spark.task.maxFailures default 4 在放弃工作之前任何特定任务的失败次数。分布在不同任务中的失败总数不会导致作业失败;特定任务必须失败此次数的尝试。应大于或等于 1。允许重试次数 = 此值 - 1。
推荐阅读
- regex - 正则表达式 - 接受基本拉丁语中的字符,除了一个
- xgboost - SageMaker XGBoost 超参数调优与 XGBoost python 包
- python - Py2PDF PdfFileWriter - 拆分 PDF 是附加文件而不是保存自己的文件
- docker - Redis 在 minikube 中不使用配置文件
- powershell - 使用 ARM 模板和 DSC 部署 Azure VM 和用户
- ios - 如何通过 Xcode 在我的应用程序中使用 San Francisco Pro 字体?
- r - 为什么裁剪栅格堆栈会更改图层名称?
- google-sheets - 在谷歌电子表格中减去两列
- tensorflow - Keras 未导入:TypeError:只能将 str(不是“列表”)连接到 str?
- ruby - Ruby 脚本的部署失败