在逗号上拆分消息,java,spring,spring-boot,apache-kafka,spring-kafka"/>

首页 > 解决方案 > Spring Boot BatchAcknowledgeingMessageListener在逗号上拆分消息

问题描述

我有一个带有实现 BatchAcknowledgeingMessageListener<String, String> 接口的 Kafka 侦听器的 Spring Boot 应用程序。当我收到来自主题的单条消息时,它实际上是原始消息中每一行的一条消息,我无法将消息转换为 ConsumerRecord<String, String>。

生成记录的代码如下所示:

this.kafkaTemplate.send("myTopic", "12345", "{\"OrderID\": \"12345\"}, \"OrderDate\": \"2021-06-01T12:13:16Z\"");

Kafka 配置看起来像这样(这仍处于使用 Testcontainers 的集成测试阶段,因此生产者正在生产消费者正在收听的同一主题):

spring:
  kafka:
    listener:
      ack-mode: manual-immediate
      concurrency: 1
    consumer:
      bootstrap-servers: localhost:9093
      enable-auto-commit: false
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 10
      topic: myTopic
    producer:
      bootstrap-servers: localhost:9093
      client-id: my-client
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      topic: myTopic

最后,消费者逻辑:

@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void consumeMessages(final List<ConsumerRecord<String, String>> records, final Acknowledgment ack) throws IOException {
  // This line fails with ClassCastException: "Can't cast String to ConsumerRecord"
  // for (final ConsumerRecord<String, String> record : records) {

  for (final Object record : records) {
    log.debug("Record: {}", record);
  }
  ...
}

此示例的调试输出为:

[LOG HEADER]: Record: {"OrderID": "12345"
[LOG HEADER]: Record: "OrderDate": "2021-06-01T12:13:16Z"}

如您所见,消息以逗号分隔,并且我收到多条消息,生成的单条消息。这显然是失败的,但我不明白为什么我不只是获得单个 ConsumerRecord<String, String> 对象。

标签: javaspringspring-bootapache-kafkaspring-kafka

解决方案


您缺少侦听器类型配置,因此默认转换服务会看到您需要一个列表并用逗号分隔字符串。

spring:
  kafka:
    listener:
      ack-mode: manual-immediate
      concurrency: 1
      type: batch
    consumer:
...

添加type: batch告诉框架您想要整批记录。


推荐阅读