首页 > 解决方案 > 在 spring kafka consumer 1.2.x 中寻求方法行为

问题描述

我不想为处理失败的那些消息提交偏移量,我希望它们再次被重新传递以进行处理。我正在使用 spring-kafka 1.2.x 并在我的侦听器中实现了ConsumerSeekAware 。

@Component
public class Listener implements ConsumerSeekAware {

    private static Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @KafkaListener(topics = "my-topic", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void listen1(ConsumerRecord<String, String> consumerRecord) throws MyCustomException {
        logger.info("received: key - " + consumerRecord.key() + " value - " + consumerRecord.value());

        // Below code is just to show the issue.Not acknowledging so I can get the same msg again.
        boolean should_commit = false;
        if(should_commit) {
            ack.acknowledge();
        }
        else {
            this.seekCallBack.get().seek(consumerRecord.topic(), consumerRecord.partition() , consumerRecord.offset());
        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        logger.info("registerSeekCallback called..");
        this.seekCallBack.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
     logger.info("onPartitionsAssigned called..");
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        logger.info("onIdleContainer called..");
    }
}
#########Container 配置(auto.commit 在消费者中为假)
factory.getContainerProperties().setAckOnError(false);
          factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

我面临的问题是,如果我在一个主题的不同分区中有 10 条消息,那么我会一一获取所有消息,并且在获取所有消息后,我会继续获取任何分区的最后一条消息。我还尝试了 SeekToCurrentErrorHandler,它在 2.0.x 版本中实现并且运行良好。但我无法升级我的 kafka 版本。如果我重新启动容器,我会再次收到所有消息,这很好,但我不想在消息处理失败时停止容器。

所以我的问题是是否有可能获得与spring-kafka 1.2.x中的SeekToCurrentErrorHandler相同的行为(完全一样而无需停止容器) ?

标签: javaapache-kafkakafka-consumer-apispring-kafka

解决方案


推荐阅读