首页 > 解决方案 > 流处理器的close()init()方法:重新平衡发生时流线程的行为?

问题描述

我想知道Kafka streams(我正在使用低级 Java API)的行为。我Stream processor通过实现 interface 来实现org.apache.kafka.streams.processor.Processor

org.apache.kafka.streams.processor.Processor 有以下方法:

无效初始化(处理器上下文上下文);

无效关闭();

假设 kafka 主题有 6 个分区,并且在我的流应用程序中它已经num.stream.threads=1并且已经SIX instances of streaming application在不同的机器上运行。所以这意味着每个流线程将只分配一个分区。

假设其中一台机器崩溃,然后我们剩下五台机器。现在,这将触发重新平衡,当它发生时,我有以下问题:

标签: javaapache-kafkaapache-kafka-streams

解决方案


当重新平衡发生时,StreamThread 会死吗?由于 StreamThread 是一个线程,所以在重新平衡期间它们是保持“活动”还是所有 Stream 线程都被“杀死”并再次创建?

不,线程保持活跃。(只有来自崩溃机器的线程显然会死掉。)

在创建 StreamProcessor 实例或每次重新平衡或创建 StreamThread 时是否调用 init() / close()?基本上想知道在什么阶段调用这些方法以及与重新平衡/创建流线程/创建处理器实例时或创建流任务时的关系。

这取决于版本。在旧版本(2.3.x 或更早版本)中,在重新平衡期间,所有任务都会暂停(即暂停),因此close()将被调用。如果现有任务被恢复(或迁移并因此重新创建)init()被调用。因此,基本上当 aStreamThread启动时,它首先会触发重新平衡,在分配分区后,创建任务并进行相应init()的调用。对于现有的StreamThreads,当触发重新平衡时,所有任务都会暂停(即,调用close())并重新分配以及重新启动新任务。

在较新的版本(2.4.x 和更新版本)中,完成了增量重新平衡,因此在重新平衡期间任务不再暂停。只有当一项任务从一个迁移StreamThread到另一个时,该任务才会在一个线程上关闭并在新线程上重新初始化。

如何以编程方式使客户离开组?但是,我确实对此进行了搜索,但得到了不相关的结果。

不知道你到底是什么意思。但是,您可以调用KafkaStreams#close()以停止其所有本地StreamThreads线程,因此这些线程最终会离开该组。


推荐阅读