首页 > 解决方案 > Kafka 消费者错误流帧描述符损坏

问题描述

我在我的 Springboot 应用程序中使用带有 StringSerializer 和 StringDeserializer 的 kafka 生产者和消费者。我可以很好地将事件写入我的 kafka 主题。但是消费者因以下错误而失败。

c.w.s.s.config.KafkaConsumerConfig   - [intContainer#0-0-C-1] - Received exception when fetching the next record from MY_TOPIC-0. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.KafkaException: Received exception when fetching the next record from MY_TOPIC-0. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1507)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1034)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:990)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Stream frame descriptor corrupted
    at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113)
    at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257)
    at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:335)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1450)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1487)
    ... 13 common frames omitted
Caused by: java.io.IOException: Stream frame descriptor corrupted
    at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
    at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
    at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)
    ... 17 common frames omitted

消息的保留期仅为 1 小时。我在这里有什么遗漏吗?

编辑#1:

我的生产者配置。

@Bean
public ProducerFactory<String, String> producerFactory() {
  Map<String, Object> configProps = new HashMap<>();
  configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
  configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

我的消费者配置。

@Bean
public ConsumerFactory<String, String> consumerFactory() {
  Map<String, Object> configProps = new HashMap<>();
  configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
  configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  configProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
  return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  return factory;
}

标签: javaspring-bootapache-kafka

解决方案


推荐阅读