首页 > 解决方案 > 如何在 Spring-kafka 中实现 ConsumerSeekAware?

问题描述

我正在尝试使用@KafkaListener 实现消费者。我正在使用 Spring2.3.7版本。

到目前为止,这是我的代码,

public class SampleListener {

@KafkaListener(topics = "test-topic",
        containerFactory = "sampleKafkaListenerContainerFactory",
        groupId = "test-group")
public void onMessage(@Payload String message,
                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                              @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                              @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long receivedTimestamp,
                              @Header(KafkaHeaders.OFFSET) long offset,
                              @Headers MessageHeaders messageHeaders) {

    LOGGER.info("Received Message for topic={} partition={} offset={} messageHeaders={}",
            topic, partition, offset, messageHeaders);
    LOGGER.debug("Received Message payload={}", message);
    doSomething(message);

   }
}

我是 Kafka 和 Spring 的新手。我阅读了 spring-kafka 文档关于如何寻求偏移但无法完全理解。

我的理解,对于我的用例,当将分区分配给容器或在任何其他情况下(确保只读一次)时,我不想再次读取事件。

我看到大多数消费者实现工具ConsumerSeekAware。我知道实施使我们能够在诸如orConsumerSeekAware之类的事件上寻求偏移量。我无法理解这些处理的场景是什么?onIdleContaineronPartitionsAssigned

  1. 实现了哪些场景ConsumerSeekAware来处理?实现需要寻求偏移量的 Kafka Consumer 的最佳实践或一般场景是什么?

  2. registerSeekCallback和 和有什么不一样onPartitionsAssigned?对于两者,它都说只要分配分区就会调用它们。这两种方法的回调有什么区别?

标签: springkafka-consumer-apispring-kafka

解决方案


实施ConsumerSeekAware允许您

一个。在初始化期间寻找特定的偏移量(或开始、结束或由时间戳表示的偏移量。

湾。Peform 在应用程序的生命周期中随时寻找。

如果可能的话,首选技术是扩展AbstractConsumerSeekAware,因为它处理了很多潜在的复杂性。

如果不需要seek,那么就不需要实现接口(或者扩展抽象类)。

我的理解,对于我的用例,当将分区分配给容器或在任何其他情况下(确保只读一次)时,我不想再次读取事件。

容器会自动为您提交偏移量(默认情况下,当 a 返回所有记录时poll(),您可以将容器AckMode属性设置RECORD为在处理完每条记录后提交偏移量)。

下次启动应用程序时,它将从上次提交的偏移量开始使用。

2.

onPartitionsAssigned在分配分区时调用(最初或重新平衡之后)。如果您在那里执行搜索,它们会在重新平衡期间直接调用消费者。

registerSeekCallback被调用来为应用程序提供一个回调句柄,该回调可以在未来的任意时间被调用。如果容器的并发数 > 1,则注册多个回调。当您对这些回调执行搜索时,它们会排队等待消费者线程在下一次轮询之前调用。(消费者不是线程安全的)。抽象类为您管理它并允许更高级别的抽象......

/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
    getSeekCallbacks()
        .forEach((tp, callback) ->
            callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}

/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
    getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
        .seekRelative(topic, partition, -1, true);
}

在即将发布的 2.6.0 版本(本周到期)中,使用 methods 更加简单,seekToBeginning()并且seekToEnd()它将seekToTimeStamp()排队查找所有分配的分区。


推荐阅读