首页 > 解决方案 > 使用 Spring Cloud Stream Kafka 进行手动确认的批处理

问题描述

我正在尝试使用手动确认将单个消息处理转换为批处理。没有批处理,代码按预期工作。但不知何故,它不适用于批处理。

这是我的属性和代码

spring.cloud.stream.kafka.bindings.inputChannel-in-0.batch-mode=true

 @Bean
    public Consumer<List<Message<?>>>  inputChannel() {
        return batch -> {

 for ( message : batch) {
                log.info("Message Processing Starts :: ");
                Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                

                 // process message and 
           // acknowledge manually
            if(acknowledgement != null ){
                ackwoledge...
            }
    };
}

但我得到 classCastException Caused by: java.lang.ClassCastException: class org.springframework.messaging.support.GenericMessage cannot be cast to class java.util.List (org.springframework.messaging.support.GenericMessage is in unnamed module of loader 'app'; java.util.List 在加载器'bootstrap'的模块 java.base 中)

另外,考虑到批处理的 maxAttempt = 1,如何在此处添加重试功能

标签: spring-bootspring-kafkaspring-cloud-stream

解决方案


推荐阅读