首页 > 解决方案 > 使用 kafka-python ParseFromString 的 Protobuf 消息解码错误

问题描述

我有一个如下的 protobuf 模型:即protoObj

sample {
  barcode: "BAR000002"
  requisition_id: "ORDER-1"
  patient_id: "P00002"
  cancer_type: "CRC"
}
assay {
  key: "lunar2"
  title: "Lunar2"
  version: "3.0"
}
lunar2 {
  type: REPORT_TYPE_COMPLETED
  version: 1
  limitations: "TBD"
  disclaimers: "TBD"
  header {
    test_name: "XYZ"
    manufacturer: "ABCD"
    sofware_verision: "TBD"
    regulatory_status: "IVD"
  }
  content {
    requisition_id: "ORDER-1"
    created {
      seconds: 1631736582
      nanos: 986649036
    }
    result: "ctDNA DETECTED"
  }
}

现在我将其发送给 Kafka 生产者,如下所示

kafkaProducer.send('topic-test', protoObj.SerializeToString())

现在在消费者端,我想在 JSON 上反序列化它并将这个 JSON 发送到下游服务,我在下面做:

 def consumerMessage(self):
        for msg in self.kafkaConsumer:
            prototype_pb2.Proto().ParseFromString(msg.value)
            print(prototype_pb2)

但我收到以下错误:

google.protobuf.message.DecodeError: Error parsing message

我不明白问题到底出在哪里。请帮忙。

标签: deserializationprotocol-bufferskafka-python

解决方案


推荐阅读