apache-spark - 火花 writeStream 进入 kafka - awaitTermination() 与 awaitAnyTermination() 之间的区别
问题描述
根据官方文档,我使用下面的代码段写入 kafka 主题,但它没有写入 kafka。
finalStream = final \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers",bootstrap_servers) \
.option("topic",topic_name) \
.option("checkpointLocation", check_point_location) \
.start()
finalStream.awaitTermination()
但是通过使用awaitAnyTermination()
而不是awaitTermination()
,写入卡夫卡作品。
spark.streams.awaitAnyTermination()
请提出这背后的原因。
解决方案
“awaitTermination() 与 awaitAnyTermination() 之间的区别”
引用源代码中的注释
awaitTermination : "等待这个查询的终止,无论是query.stop()
由异常还是由异常终止。如果查询因异常终止,则抛出异常。否则,在timeoutMs
毫秒内返回查询是否已终止。如果查询已终止,则对该方法的所有后续调用将true
立即返回(如果查询由 终止stop()
),或立即抛出异常(如果查询已因异常终止)。"
awaitAnyTermination : "等到关联的 SQLContext 上的任何查询自上下文创建后终止,或者自resetTerminated()
调用以来。如果任何查询因异常而终止,则将引发异常。如果查询已终止,则后续调用awaitAnyTermination()
将立即返回(如果查询由 终止query.stop()
),或立即抛出异常(如果查询因异常终止)。resetTerminated()
用于清除过去的终止并等待新的终止。在多个查询已终止的情况下由于resetTermination()
被调用,如果任何查询因异常而终止,则awaitAnyTermination()
将抛出任何异常。为了正确记录多个查询中的异常,用户需要在其中任何一个异常终止后停止所有异常,然后检查query.exception()
每个查询。”
推荐阅读
- python - 打印类实例列表的随机实例但得到 RecursionError python3
- php - 如何从一行中选择多个名称?
- android - 已发布应用程序上的 Sqlite 表和数据库迁移?
- javascript - CSS 透明窗口
- android - 将相机 2 预览锁定到 Android 中的传感器景观
- google-bigquery - 当 Big Query 加载失败且 CSV 表遇到太多错误,放弃时获取更多信息
- node.js - Sitespeed.io 本地显示不兼容的节点模块版本
- c# - 调度程序后台服务中的异步计时器
- python - 下面的 Python 脚本没有给出任何结果
- javascript - 如何在窗口中使用 Google 字体。在 React 中打开