apache-kafka - 如何在 Batch listener 错误处理场景中读取 Header 值
问题描述
我正在尝试在侦听器处处理异常
@KafkaListener(id = PropertiesUtil.ID,
topics = "#{'${kafka.consumer.topic}'}",
groupId = "${kafka.consumer.group.id.config}",
containerFactory = "containerFactory",
errorHandler = "errorHandler")
public void receiveEvents(@Payload List<ConsumerRecord<String, String>> recordList,
Acknowledgment acknowledgment) {
try {
log.info("Consuming the batch of size {} from kafka topic {}", consumerRecordList.size(),
consumerRecordList.get(0).topic());
processEvent(consumerRecordList);
incrementOffset(acknowledgment);
} catch (Exception exception) {
throwOrHandleExceptions(exception, recordList, acknowledgment);
.........
}
}
Kafka 容器配置:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(this.numberOfConsumers);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setConsumerFactory(getConsumerFactory());
factory.setBatchListener(true);
return factory;
}
}
侦听器错误处理程序 impl
@Bean
public ConsumerAwareListenerErrorHandler errorHandler() {
return (m, e, c) -> {
MessageHeaders headers = m.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
}
...
};
}
当我尝试在没有批处理的情况下运行相同的内容时,我能够获取分区、主题和偏移值,但是当我启用批处理并尝试对其进行测试时,我在标题中只得到两个值,即 id 和时间戳和未设置其他值。我在这里错过什么了吗?
解决方案
你用的是什么版本?我刚刚用 Boot 2.2.4 (SK 2.3.5) 对其进行了测试,它工作正常......
@SpringBootApplication
public class So60152179Application {
public static void main(String[] args) {
SpringApplication.run(So60152179Application.class, args);
}
@KafkaListener(id = "so60152179", topics = "so60152179", errorHandler = "eh")
public void listen(List<String> in) {
throw new RuntimeException("foo");
}
@Bean
public ConsumerAwareListenerErrorHandler eh() {
return (m, e, c) -> {
System.out.println(m);
return null;
};
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so60152179", "foo");
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so60152179").partitions(1).replicas(1).build();
}
}
spring.kafka.listener.type=batch
spring.kafka.consumer.auto-offset-reset=earliest
GenericMessage [payload=[foo], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2f2e787f, kafka_timestampType =[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_receivedTopic=[so60152179], kafka_receivedTimestamp=[1581351585253], kafka_groupId=so60152179}]
推荐阅读
- go - Postgres 自动重新连接
- html - 如何使用 webpack 在静态 html 页面中导入静态资产?
- html - 控制器未定义 laravel
- python - TypeError: super(type, obj) 当使用 exec()
- r - 为什么“长”数据帧比“宽”数据帧占用更多内存?
- jquery - 将带有属性的 XML 文件加载到 JQuery 数组中...数组为空
- android - Android Studio中的自动完成问题给出错误
- bash - < 和 << 使用 readarray 的区别
- memory - VHDL 运行时:无效的内存访问(悬空访问或堆栈大小太小)
- angular - 如何在 Angular 6+ 中实现 systemjs?