java - 如何通过spring cloud stream binder kafka流依赖使用协议缓冲区(protobuf)使用来自kafka主题的消息?
问题描述
我有一个使用 protobuf 格式的消息的应用程序,当我运行它时,我收到此错误:
Exception in thread "NotificationProcessorService-process-applicationId-0300a3f8-6dab-4f3f-a631-8719178823ce-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:865)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:938)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:640)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 5, 0, 10, 8, 57, 53, 52, 50, 56, 51, 51, 51, 16, -7, -12, -106, -97, -119, 47, 26, 6, 57, 56, 55, 56, 54, 55, 34, 4, 56, 54, 50, 50, 42, 6, 56, 57, 55, 51, 50, 57, 50, 5, 80, 82, 73, 77, 69, 58, 5, 56, 55, 57, 50, 51, 65, 31, -123, -21, 81, -72, 93, -108, 64, 72, 2, 82, 6, 67, 82, 69, 68, 73, 84, 89, 31, -123, -21, 81, -72, 93, -108, 64, 97, -26, -48, 34, -37, -7, 74, 64, 64, 105, -26, -48, 34, -37, -7, 74, 64, 64, 113, -26, -48, 34, -37, -7, 74, 64, 64, 122, 4, 77, 65, 73, 76]] from topic [pos-proto-topic]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x4ff0a08 (above 0x0010ffff) at char #1, byte #7)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:250)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2384)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672)
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357)
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2064)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1555)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:517)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:55)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:865)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:938)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:640)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
我认为应用程序默认需要 Json 消息,我需要更改一些配置,说“嘿,我在这里等待 protobuf 消息”。我觉得我在整个互联网上搜索并没有找到如何设置它。
这是我的 application.yaml 文件:
spring:
cloud:
stream:
bindings:
notification-input-channel:
destination: pos-proto-topic
notification-output-channel:
destination: notification-topic
kafka:
streams:
binder:
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
bindings:
notification-output-channel:
producer:
valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
我也使用 Hoxton.SR9 作为 spring-cloud.version。有谁知道如何解决这个问题?
解决方案
你需要设置:
spring.cloud.stream.kafka.streams.bindings.<channel-name>-in-0.consumer.valueSerde
推荐阅读
- java - Java Rest with OkHttp 和设置证书
- excel - 组合框问题中具有条件的唯一项目
- php - 如何在codeigniter中排除NULL值并使用get_where在excel中打印
- reactjs - React:如何在条件渲染中使用内联样式?
- http - http.HandleFunc 总是匹配“/”
- swiftui - SwiftUI CoreData - 存储获取请求数据
- mysql - MySQL DELETE 空间释放
- hive - 如何在进行选择查询时将 YYY-MM-DD HH:mm:ss 中的时间戳转换为 Hive 中的 YYY-MM-DD HH:mm:ss.SSS?
- c# - 从 .Net 连接到 DB2 时出现 System.BadImageFormatException
- rtp - 在 aac adts 标头之前添加 rtp 标头