首页 > 解决方案 > 如何在 Batch listener 错误处理场景中读取 Header 值

问题描述

我正在尝试在侦听器处处理异常

 @KafkaListener(id = PropertiesUtil.ID,
            topics = "#{'${kafka.consumer.topic}'}",
            groupId = "${kafka.consumer.group.id.config}",
            containerFactory = "containerFactory",
            errorHandler = "errorHandler")
    public void receiveEvents(@Payload List<ConsumerRecord<String, String>> recordList,
                              Acknowledgment acknowledgment) {
        try {
            log.info("Consuming the batch of size {} from kafka topic {}", consumerRecordList.size(),
                    consumerRecordList.get(0).topic());
            processEvent(consumerRecordList);
            incrementOffset(acknowledgment);
        } catch (Exception exception) {
            throwOrHandleExceptions(exception, recordList, acknowledgment);
            .........
        }
    }

Kafka 容器配置:

   @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(this.numberOfConsumers);
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setConsumerFactory(getConsumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
}

侦听器错误处理程序 impl

@Bean
public ConsumerAwareListenerErrorHandler errorHandler() {
    return (m, e, c) -> {
        MessageHeaders headers = m.getHeaders();
        List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class); 
        List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
        List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
        Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
        for (int i = 0; i < topics.size(); i++) {
            int index = i;
            offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
                    (k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
        }
       ...
    };
}

当我尝试在没有批处理的情况下运行相同的内容时,我能够获取分区、主题和偏移值,但是当我启用批处理并尝试对其进行测试时,我在标题中只得到两个值,即 id 和时间戳和未设置其他值。我在这里错过什么了吗?

标签: apache-kafkaspring-kafka

解决方案


你用的是什么版本?我刚刚用 Boot 2.2.4 (SK 2.3.5) 对其进行了测试,它工作正常......

@SpringBootApplication
public class So60152179Application {

    public static void main(String[] args) {
        SpringApplication.run(So60152179Application.class, args);
    }


    @KafkaListener(id = "so60152179", topics = "so60152179", errorHandler = "eh")
    public void listen(List<String> in) {
        throw new RuntimeException("foo");
    }

    @Bean
    public ConsumerAwareListenerErrorHandler eh() {
        return (m, e, c) -> {
            System.out.println(m);
            return null;
        };
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so60152179", "foo");
        };
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so60152179").partitions(1).replicas(1).build();
    }

}
spring.kafka.listener.type=batch
spring.kafka.consumer.auto-offset-reset=earliest

GenericMessage [payload=[foo], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2f2e787f, kafka_timestampType =[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_receivedTopic=[so60152179], kafka_receivedTimestamp=[1581351585253], kafka_groupId=so60152179}]


推荐阅读