首页 > 解决方案 > KafkaStreams 没有关闭

问题描述

如果我创建并启动一个 KafkaStream 实例,然后在关闭钩子调用 .close() 中,什么都不会发生 - 日志记录表明存在的一个 StreamThread 已进入 PENDING_SHUTDOWN 状态,但然后就永远这样。我已经尝试了一切都没有运气 - 已经阅读了 Kafka 流源代码以查看它在关闭期间做了什么,但在我看来,代码暗示 StreamThread,如果处于运行状态,将永远不会停止(这不可能是正确的- 我没有在 Kafka JIRA 中看到任何这种性质的错误)。

这是我的简单 KafkaStream (scala) 应用程序的相关代码:

val props: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
 p
}


implicit val produced = Produced.`with`(new StringSerde(), new StringSerde())

val builder: StreamsBuilder = new StreamsBuilder()
val in: KStream[String, String] = builder.stream[String, String]("input-topic")

in.map((k,v) =>{ 
         println("Consumed and transforming value")
         (k,s"$v_transformed") 
     }).to("output-topic")

val streams: KafkaStreams = new KafkaStreams(builder.build(), props)

streams.start()

sys.addShutdownHook(streams.stop())

如您所见,它只是从一个主题中读取,对已消费记录中的值进行微小更改并将其写入另一个主题。

流已启动,应用程序将在向其发送 SIGINT 时调用 stop()。

当我 ^C 终止该进程时,我看到 Kafka 日志记录表明 StreamThread-1 正在转换为 PENDING_SHUTDOWN 并且到目前为止。它最终应该(在几秒钟内)达到状态 NOT_RUNNING 但它永远不会,它会继续为它继续从输入主题读取的每条记录输出我的 println 语句。

我在这里做错了什么?

更新:根据评论者的建议,我尝试用 60 秒的超时时间调用 close(),最终得到了这个,但仍然没有关闭:-

[shutdownHook1] INFO o.apache.kafka.streams.KafkaStreams - 流客户端 [-] 流客户端无法在超时内完全停止

标签: apache-kafka-streams

解决方案


推荐阅读