首页 > 解决方案 > Apache Avro - 读取二进制流

问题描述

我正在尝试构建一个摄取 Avro 流消息的 Kafka 消费者。这是我阅读 Avro 消息文件和工作的资源:(https://avro.apache.org/docs/current/gettingstartedpython.html

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

reader = DataFileReader(open("weather.avro", "rb"), DatumReader(avro.schema.parse(open("mySchema.avsc").read())))
for user in reader:
    print (user)
reader.close()

但是,我的问题是,如何读取 Avro 流而不是文件?

我尝试了类似的方法,但返回了错误的值。

def avro_decoder(msg_value):
    schema = avro.schema.parse(open("mySchema.avsc").read())
    reader = DatumReader(schema)
    message_bytes = io.BytesIO(msg_value)
    message_bytes.seek(5)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

这是架构

schema = {
              "type": "record",
              "name": "Weather",
              "namespace": "test",
              "doc": "A weather reading.",
              "fields": [
                {
                  "name": "station",
                  "type": "string"
                },
                {
                  "name": "time",
                  "type": "long"
                },
                {
                  "name": "temp",
                  "type": "int"
                }
              ]
            }

这是 avro 消息:

Objavro.codecnullavro.schemaú{"type": "record", "doc": "A weather reading.", "name": "test.Weather", "fields": [{"name": "station", "type": "string"}, {"name": "time", "type": "long"}, {"name": "temp", "type": "int"}]} ñKœV^¤‹ÿŸÕ>Z:v&011990-99999˜ÒïÖ
 ñKœV^¤‹ÿŸÕ>Z:v

有人可以帮忙吗?

标签: pythonapache-kafkabinaryavro

解决方案


只是我弄清楚这是什么问题。所以问题是我发送了错误的信息!我在 .avro 文件中发送了消息,但实际上,正确的必须是这样的:123111


推荐阅读