java - 从 ConsumerRebalanceListener 方法调用 KafkaConsumer#assignment() 是否安全?
问题描述
我想将 KafkaConsumer 的所有当前分配的分区记录到 Slf4j 的 MDC,以便我可以在日志中显示它并在调试时进行一些关联。我最初想出的一个版本是:
return new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
MDC.put("some-partitions-key", stringify(partitions));
}
};
但是ConsumerRebalanceListener#onPartitionsAssigned的文档说
以前拥有的分区将不包括在内,即此列表将仅包括新添加的分区
因此,如果我想使用它,我必须自己保留一个分区列表,这似乎并不理想。有一个方法KafkaConsumer#assignment返回所有当前订阅的分区,尽管要使用它,我必须在重新平衡侦听器中引用消费者对象本身:
return new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
MDC.put("some-partitions-key", stringify(consumer.assignment()));
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
MDC.put("some-partitions-key", stringify(consumer.assignment()));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
MDC.put("some-partitions-key", stringify(consumer.assignment()));
}
};
我的问题是:这是否安全并保证返回正确的结果?我想线程安全本身不会起作用,因为据我了解,消费者是单线程的,并且侦听器仅在KafkaConsumer#poll
. 但是是否KafkaConsumer#assignment
保证在重新平衡侦听器的上下文中包含所有当前有效的分区?除了在 之后自己做之外,还有更好的方法KafkaConsumer#poll()
吗,如下所示?
consumer.poll(duration);
MDC.put("some-partitions-key", stringify(consumer.assignment()));
解决方案
推荐阅读
- swift - Swift 只能从 firebase 中看到一个值(检索数据)
- c# - 是否有适当的解决方法来重载需要采用相同值类型的方法,还是应该避免?
- windows - 更新显卡不更新opengl32.dll
- php - 如何从laravel中的外键中提取数据?
- c++ - 需要一些帮助来理解类继承(C++)
- c - A96开发板 GPRS C SDK 搭建环境安装Windows
- python - 检查目标时出错:预期 dense_3 有 2 个维度,但得到了形状为 (10, 10, 2) 的数组
- javascript - 解决在chrome中运行的tensorflow js的“未捕获(承诺)TypeError:fs.writeFile不是函数”的替代方法
- beautifulsoup - 如何在python中用汤检索最后一个页码
- docker - “码头集装箱 rm
” 与 “搬运工 rm "