首页 > 解决方案 > 如何从 mqtt 消息中反序列化 avro 数据?

问题描述

我正在接收序列化(AVRO)数据作为 mqtt 消息。消息看起来像这样Objavro.codecnullavro.schemaº{"type": "record", "name": "User", "namespace": "example.avro", "fields": [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, {"type": ["string", "null"], "name": "favorite_color"}]} Œpq+±)žJ@xX·,Alyssa €Ben redŒpq+±)žJ@xX·

我必须使用Python3和已知模式user.avsc反序列化这些数据-

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

反序列化的数据应该是这样的

{u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}

使用https://avro.apache.org/docs/current/gettingstartedpython.html中给出的示例,数据是从 DataFileWriter/Reader 方法写入/读取的,但是如果能像消息一样即时使用它会很棒到达 python 代码反序列化数据并打印它。

已经处理了 MQTT 订阅逻辑,它现在只打印传入的消息,我想用传入的消息打印反序列化的数据。

我尝试了以下(反序列化逻辑):

import avro.schema
from avro.io import DatumReader, DatumWriter
import io

schema = avro.schema.parse(open("user.avsc", "rb").read())
# message passed here is incoming message
bytes_reader = io.BytesIO(bytes(message, encoding='utf-8'))
decoder = avro.io.BinaryDecoder(bytes_reader)

reader = avro.io.DatumReader(schema)
data = reader.read(decoder)
print(data)

上面的代码失败(TypeError: ord() 期望一个字符,但找到长度为 0 的字符串)因为我无法找出正确的格式来用作reader.read()方法的参数。我使用 io.BytesIO 的原因是因为数据以字符串形式到达,我无法传递字符串,并且显然来自 apache 页面的示例以二进制格式读取数据并将其用于反序列化。

谢谢

标签: pythonmqttavro

解决方案


If the message you get from MQTT is in a string format (and not bytes) then you probably are not going to be able to deserialize it. If you are seeing the avro binary in a string format you are not going to be able to just encode it as UTF-8 and deserialize it. You need the actual binary.


推荐阅读