java - Kafka 消费者错误流帧描述符损坏
问题描述
我在我的 Springboot 应用程序中使用带有 StringSerializer 和 StringDeserializer 的 kafka 生产者和消费者。我可以很好地将事件写入我的 kafka 主题。但是消费者因以下错误而失败。
c.w.s.s.config.KafkaConsumerConfig - [intContainer#0-0-C-1] - Received exception when fetching the next record from MY_TOPIC-0. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.KafkaException: Received exception when fetching the next record from MY_TOPIC-0. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1507)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1034)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:990)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Stream frame descriptor corrupted
at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113)
at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257)
at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:335)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1450)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1487)
... 13 common frames omitted
Caused by: java.io.IOException: Stream frame descriptor corrupted
at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)
... 17 common frames omitted
消息的保留期仅为 1 小时。我在这里有什么遗漏吗?
编辑#1:
我的生产者配置。
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
我的消费者配置。
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
configProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
解决方案
推荐阅读
- postgresql - IF条件下的Postgres存储过程BEGIN和END
- android - Android Studio 与 PayPal 的集成不起作用
- selenium - 新 chrome 更新后的 selenium 操作错误:过时的元素引用元素未附加到页面文档
- mysql - 查找在另一个表中没有条目的列值
- python - 熊猫情节 - 范围
- c# - 当您开始移动然后跳跃时,玩家跳跃延迟
- youtube-api - 从 youtube.com/c/xxxx 链接获取频道 ID?
- javascript - 前端在javascript中获取表单字段作为数组
- octave - 逻辑回归,梯度下降 Octave 实现
- docker - Docker cp 不会复制到现有目录,而是始终创建子目录