apache-spark - StreamingQuery.awaitTermination 的目的是什么?
问题描述
我有一个 Spark Structured Streaming 作业,它从 Kafka 主题中读取偏移量并将其写入 aerospike 数据库。目前,我正在准备这项工作生产并实施SparkListener
.
在浏览文档时,我偶然发现了这个例子:
StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination();
执行此代码后,流计算将
在后台启动。查询对象是该活动
流式查询的句柄,我们决定
使用 awaitTermination() 等待查询终止,以防止在查询处于活动状态时进程退出
。
我知道它会在终止进程之前等待查询完成。
究竟是什么意思?它有助于避免查询写入的数据丢失。
当查询每天写入数百万条记录时,它有什么帮助?
我的代码看起来很简单:
dataset.writeStream()
.option("startingOffsets", "earliest")
.outputMode(OutputMode.Append())
.format("console")
.foreach(sink)
.trigger(Trigger.ProcessingTime(triggerInterval))
.option("checkpointLocation", checkpointLocation)
.start();
解决方案
这里有很多问题,但只回答以下一个应该可以回答所有问题。
我知道它会在终止进程之前等待查询完成。究竟是什么意思?
流式查询在单独的守护线程中运行。在 Java 中,守护线程用于允许并行处理,直到 Spark 应用程序的主线程完成(dies)。在最后一个非守护线程完成后,JVM 立即关闭,整个 Spark 应用程序完成。
这就是为什么您需要让主非守护线程等待其他守护线程,以便它们可以完成工作。
推荐阅读
- javascript - 如何根据某些类和 id 将文本写入 html 标签?
- reactjs - 如何在反应中路由到不同的页面?
- javascript - Javascript addEventListener() 显示标签
- postgresql - OVER() 窗口函数中的 ORDER BY 在 postgresql 中是什么意思?
- android - Room DB:如何在不使用 TypeConverter 的情况下使用列表参数初始化实体
- r - 替换那些没有 R 中前导/结束空格的字符串
- routes - 本地测试时微服务如何路由
- java - 两次调用 ActionListener
- java - java 8 将字节分割成块
- embedded - 我在哪里可以检查 CANalyzer 中的 CAN 帧错误?