spring-cloud-stream - spring-cloud-stream-binder-kafka-streams - 在功能实现中读取 Avro 消息
问题描述
我正在尝试使用来自已在 Avro 中序列化的主题的消息。文档对于这是如何工作的非常混乱。https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_inbound_deserialization
我试图阅读的消息是 avro 序列化消息。我在同一个项目中有键和值的模式,并从模式中生成了类 - 键和值。
我的困惑是,有一些独特的应用程序属性和代码组合可以使其工作。现在我似乎弄错了,我一直在尝试使用一堆属性和代码组合,但它们都不起作用。
我不断收到的错误是
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 7, -46, 15]] from topic [dbserver1.inventory.customers]
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])"�"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690)
看起来默认的 json 序列化器正在启动并试图反序列化 avro 序列化消息。
我的代码如下所示
@SpringBootApplication
class SpringBootKafkaConsumer {
@Bean
fun process(): Consumer<KStream<SpecificAvroSerde<Key>, SpecificAvroSerde<Value>>> {
return Consumer { input -> input.foreach { key, value ->
println("============key = $key")
println("===========value = $value")
}}
}
}
fun main(args: Array<String>) {
runApplication<SpringBootKafkaConsumer>(*args)
}
应用程序.yml
spring:
application:
name: customer-balance
cloud:
stream:
kafka:
streams:
binder:
configuration:
application:
id: customer-balance-1
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
bindings:
process_in:
destination: "dbserver1.inventory.customers"
keySerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
nativeDecoding: true
startOffset: earliest
content-type: application/*+avro
logging:
level:
org.springframework.kafka.config: trace
解决方案
推荐阅读
- r - Imports 字段中的命名空间不是从:'rlang''shinydashboard' 导入的,我应该写 @importFrom rlang rlang 还是 @importFrom rlang
- flutter - Flutter - 无法识别的应用程序。请确保您信任此应用程序,然后再继续
- asp.net - 文件上传未检测到文件扩展名
- sql - 使用 generate_series() 删除行范围?
- android - 防止请求重放攻击
- date - 在嵌套的 IFS 语句中保留 Google 表格中的日期格式
- pspice - 如何解决在 ltspice 的错误日志中发现的问题?
- outlook - 加密的 MailItem 的 PR_ATTACH_FLAGS 属性在移动到另一个文件夹后为内联附件返回 5
- python - 使用行的值作为列
- javascript - CSS - 如何内联表单/输入但使用适当的缩进堆叠每一行?