首页 > 解决方案 > spring-cloud-stream-binder-kafka-streams - 在功能实现中读取 Avro 消息

问题描述

我正在尝试使用来自已在 Avro 中序列化的主题的消息。文档对于这是如何工作的非常混乱。https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_inbound_deserialization

我试图阅读的消息是 avro 序列化消息。我在同一个项目中有键和值的模式,并从模式中生成了类 - 键和值。

我的困惑是,有一些独特的应用程序属性和代码组合可以使其工作。现在我似乎弄错了,我一直在尝试使用一堆属性和代码组合,但它们都不起作用。

我不断收到的错误是

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 7, -46, 15]] from topic [dbserver1.inventory.customers]
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (byte[])"�"; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690)

看起来默认的 json 序列化器正在启动并试图反序列化 avro 序列化消息。

我的代码如下所示

@SpringBootApplication
class SpringBootKafkaConsumer {

  @Bean
  fun process(): Consumer<KStream<SpecificAvroSerde<Key>, SpecificAvroSerde<Value>>> {
    return Consumer { input -> input.foreach { key, value ->
      println("============key = $key")
      println("===========value = $value")
    }}
  }
}

fun main(args: Array<String>) {
  runApplication<SpringBootKafkaConsumer>(*args)
}

应用程序.yml

spring:
  application:
    name: customer-balance
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: customer-balance-1
            consumer-properties:
              key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
              value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
              schema.registry.url: http://localhost:8081
              specific.avro.reader: true

      bindings:
        process_in:
          destination: "dbserver1.inventory.customers"
          keySerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          nativeDecoding: true
          startOffset: earliest
          content-type: application/*+avro

logging:
  level:
    org.springframework.kafka.config: trace

标签: spring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


推荐阅读