首页 > 解决方案 > 如何通过spring cloud stream binder kafka流依赖使用协议缓冲区(protobuf)使用来自kafka主题的消息?

问题描述

我有一个使用 protobuf 格式的消息的应用程序,当我运行它时,我收到此错误:

Exception in thread "NotificationProcessorService-process-applicationId-0300a3f8-6dab-4f3f-a631-8719178823ce-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:865)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:938)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:640)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 5, 0, 10, 8, 57, 53, 52, 50, 56, 51, 51, 51, 16, -7, -12, -106, -97, -119, 47, 26, 6, 57, 56, 55, 56, 54, 55, 34, 4, 56, 54, 50, 50, 42, 6, 56, 57, 55, 51, 50, 57, 50, 5, 80, 82, 73, 77, 69, 58, 5, 56, 55, 57, 50, 51, 65, 31, -123, -21, 81, -72, 93, -108, 64, 72, 2, 82, 6, 67, 82, 69, 68, 73, 84, 89, 31, -123, -21, 81, -72, 93, -108, 64, 97, -26, -48, 34, -37, -7, 74, 64, 64, 105, -26, -48, 34, -37, -7, 74, 64, 64, 113, -26, -48, 34, -37, -7, 74, 64, 64, 122, 4, 77, 65, 73, 76]] from topic [pos-proto-topic]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x4ff0a08 (above 0x0010ffff) at char #1, byte #7)
    at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
    at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:250)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2384)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672)
    at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357)
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2064)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1555)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:517)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:55)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:865)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:938)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:640)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

我认为应用程序默认需要 Json 消息,我需要更改一些配置,说“嘿,我在这里等待 protobuf 消息”。我觉得我在整个互联网上搜索并没有找到如何设置它。

这是我的 application.yaml 文件:

spring:
  cloud:
    stream:
      bindings:
        notification-input-channel:
          destination: pos-proto-topic

        notification-output-channel:
          destination: notification-topic
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            configuration:
              schema.registry.url: http://localhost:8081
          bindings:
            notification-output-channel:
              producer:
                valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde

我也使用 Hoxton.SR9 作为 spring-cloud.version。有谁知道如何解决这个问题?

标签: javaprotocol-buffersapache-kafka-streamsspring-cloud-stream-binder-kafkaprotobuf-java

解决方案


你需要设置:

spring.cloud.stream.kafka.streams.bindings.<channel-name>-in-0.consumer.valueSerde

推荐阅读