apache-kafka-streams - KafkaStreams 没有关闭
问题描述
如果我创建并启动一个 KafkaStream 实例,然后在关闭钩子调用 .close() 中,什么都不会发生 - 日志记录表明存在的一个 StreamThread 已进入 PENDING_SHUTDOWN 状态,但然后就永远这样。我已经尝试了一切都没有运气 - 已经阅读了 Kafka 流源代码以查看它在关闭期间做了什么,但在我看来,代码暗示 StreamThread,如果处于运行状态,将永远不会停止(这不可能是正确的- 我没有在 Kafka JIRA 中看到任何这种性质的错误)。
这是我的简单 KafkaStream (scala) 应用程序的相关代码:
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
p
}
implicit val produced = Produced.`with`(new StringSerde(), new StringSerde())
val builder: StreamsBuilder = new StreamsBuilder()
val in: KStream[String, String] = builder.stream[String, String]("input-topic")
in.map((k,v) =>{
println("Consumed and transforming value")
(k,s"$v_transformed")
}).to("output-topic")
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.addShutdownHook(streams.stop())
如您所见,它只是从一个主题中读取,对已消费记录中的值进行微小更改并将其写入另一个主题。
流已启动,应用程序将在向其发送 SIGINT 时调用 stop()。
当我 ^C 终止该进程时,我看到 Kafka 日志记录表明 StreamThread-1 正在转换为 PENDING_SHUTDOWN 并且到目前为止。它最终应该(在几秒钟内)达到状态 NOT_RUNNING 但它永远不会,它会继续为它继续从输入主题读取的每条记录输出我的 println 语句。
我在这里做错了什么?
更新:根据评论者的建议,我尝试用 60 秒的超时时间调用 close(),最终得到了这个,但仍然没有关闭:-
[shutdownHook1] INFO o.apache.kafka.streams.KafkaStreams - 流客户端 [-] 流客户端无法在超时内完全停止
解决方案
推荐阅读
- html - 为什么我的底部 div 与我的另一个 div 重叠而不是堆叠?
- java - 骆驼:无法找到 XML 模式命名空间的 Spring NamespaceHandler [http://camel.apache.org/schema/spring]
- javascript - 有没有办法通过javascript更改outlook中的地址?
- c - 如何在字符数组中存储多组字符串
- macos - Xamarin.Forms macOS 启动画面
- node.js - 通过 NodeJS 开发工具包 (aws-sdk-js) 拒绝访问 AWS SQS(突然!!)
- java - 将输入列表与存储库对象列表进行比较:循环执行需要更多时间
- xcode - Xcode 为 ARKit 将 3d 模型转换为 DAE 格式的问题
- c++ - 静态数组的免费动态数组?
- linux - 我的服务器没有监听文件更改