首页 > 解决方案 > 从 ConsumerRebalanceListener 方法调用 KafkaConsumer#assignment() 是否安全?

问题描述

我想将 KafkaConsumer 的所有当前分配的分区记录到 Slf4j 的 MDC,以便我可以在日志中显示它并在调试时进行一些关联。我最初想出的一个版本是:

return new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        MDC.put("some-partitions-key", stringify(partitions));
    }
};

但是ConsumerRebalanceListener#onPartitionsAssigned的文档说

以前拥有的分区将不包括在内,即此列表将仅包括新添加的分区

因此,如果我想使用它,我必须自己保留一个分区列表,这似乎并不理想。有一个方法KafkaConsumer#assignment返回所有当前订阅的分区,尽管要使用它,我必须在重新平衡侦听器中引用消费者对象本身:

return new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        MDC.put("some-partitions-key", stringify(consumer.assignment()));
    }

    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        MDC.put("some-partitions-key", stringify(consumer.assignment()));
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        MDC.put("some-partitions-key", stringify(consumer.assignment()));
    }
};

我的问题是:这是否安全并保证返回正确的结果?我想线程安全本身不会起作用,因为据我了解,消费者是单线程的,并且侦听器仅在KafkaConsumer#poll. 但是是否KafkaConsumer#assignment保证在重新平衡侦听器的上下文中包含所有当前有效的分区?除了在 之后自己做之外,还有更好的方法KafkaConsumer#poll()吗,如下所示?

consumer.poll(duration);
MDC.put("some-partitions-key", stringify(consumer.assignment()));

标签: javaapache-kafkakafka-consumer-api

解决方案


推荐阅读