java - 流处理器的close()init()方法:重新平衡发生时流线程的行为?
问题描述
我想知道Kafka streams
(我正在使用低级 Java API)的行为。我Stream processor
通过实现 interface 来实现org.apache.kafka.streams.processor.Processor
。
org.apache.kafka.streams.processor.Processor
有以下方法:
无效初始化(处理器上下文上下文);
无效关闭();
假设 kafka 主题有 6 个分区,并且在我的流应用程序中它已经num.stream.threads=1
并且已经SIX instances of streaming application
在不同的机器上运行。所以这意味着每个流线程将只分配一个分区。
假设其中一台机器崩溃,然后我们剩下五台机器。现在,这将触发重新平衡,当它发生时,我有以下问题:
当重新平衡发生时,StreamThread 会死吗?由于 StreamThread 是一个线程,所以在重新平衡期间它们是保持“活动”还是所有 Stream 线程都被“杀死”并再次创建?
在创建 StreamProcessor 实例或每次重新平衡或创建 StreamThread 时是否调用 init() / close()?基本上想知道在什么阶段调用这些方法以及与重新平衡/创建流线程/创建处理器实例时或创建流任务时的关系。
如何以编程方式使客户离开组?但是,我确实对此进行了搜索,但得到了不相关的结果。
解决方案
当重新平衡发生时,StreamThread 会死吗?由于 StreamThread 是一个线程,所以在重新平衡期间它们是保持“活动”还是所有 Stream 线程都被“杀死”并再次创建?
不,线程保持活跃。(只有来自崩溃机器的线程显然会死掉。)
在创建 StreamProcessor 实例或每次重新平衡或创建 StreamThread 时是否调用 init() / close()?基本上想知道在什么阶段调用这些方法以及与重新平衡/创建流线程/创建处理器实例时或创建流任务时的关系。
这取决于版本。在旧版本(2.3.x 或更早版本)中,在重新平衡期间,所有任务都会暂停(即暂停),因此close()
将被调用。如果现有任务被恢复(或迁移并因此重新创建)init()
被调用。因此,基本上当 aStreamThread
启动时,它首先会触发重新平衡,在分配分区后,创建任务并进行相应init()
的调用。对于现有的StreamThreads
,当触发重新平衡时,所有任务都会暂停(即,调用close()
)并重新分配以及重新启动新任务。
在较新的版本(2.4.x 和更新版本)中,完成了增量重新平衡,因此在重新平衡期间任务不再暂停。只有当一项任务从一个迁移StreamThread
到另一个时,该任务才会在一个线程上关闭并在新线程上重新初始化。
如何以编程方式使客户离开组?但是,我确实对此进行了搜索,但得到了不相关的结果。
不知道你到底是什么意思。但是,您可以调用KafkaStreams#close()
以停止其所有本地StreamThreads
线程,因此这些线程最终会离开该组。
推荐阅读
- webview - 无法在 WebView 上运行 WebGL 内容
- r - R data.table 如果然后 sumif 使用连接查找
- mysql - 如何设计考虑更新操作的一对多关系表?
- c# - 插入记录后网格视图显示不正确的值
- sql - 如何从 textarea 使用 PHP 向多列插入值?
- python - 获取字典的缺失日期
- oracle - Oracle SYS_CONNECT_BY_PATH 无法正常工作(始终为空)
- c# - 将项目添加到通用列表时出现问题
- jenkins - 有没有办法在 Jenkinsfile 中使用“propagate=false”直接用于阶段/步骤的声明性语法?
- html - css-如何对齐所有文本框?