首页 > 解决方案 > 火花 writeStream 进入 kafka - awaitTermination() 与 awaitAnyTermination() 之间的区别

问题描述

根据官方文档,我使用下面的代码段写入 kafka 主题,但它没有写入 kafka。

finalStream = final \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers",bootstrap_servers) \
  .option("topic",topic_name) \
  .option("checkpointLocation", check_point_location) \
  .start()

finalStream.awaitTermination()

但是通过使用awaitAnyTermination()而不是awaitTermination(),写入卡夫卡作品。

spark.streams.awaitAnyTermination()

请提出这背后的原因。

标签: apache-sparkpysparkapache-kafkaspark-structured-streaming

解决方案


“awaitTermination() 与 awaitAnyTermination() 之间的区别”

引用源代码中的注释

awaitTermination : "等待这个查询的终止,无论是query.stop()由异常还是由异常终止。如果查询因异常终止,则抛出异常。否则,在timeoutMs毫秒内返回查询是否已终止。如果查询已终止,则对该方法的所有后续调用将true立即返回(如果查询由 终止stop()),或立即抛出异常(如果查询已因异常终止)。"

awaitAnyTermination : "等到关联的 SQLContext 上的任何查询自上下文创建后终止,或者自resetTerminated()调用以来。如果任何查询因异常而终止,则将引发异常。如果查询已终止,则后续调用awaitAnyTermination()将立即返回(如果查询由 终止query.stop()),或立即抛出异常(如果查询因异常终止)。resetTerminated()用于清除过去的终止并等待新的终止。在多个查询已终止的情况下由于resetTermination()被调用,如果任何查询因异常而终止,则awaitAnyTermination()将抛出任何异常。为了正确记录多个查询中的异常,用户需要在其中任何一个异常终止后停止所有异常,然后检查query.exception()每个查询。”


推荐阅读