apache-spark - PySpark:反序列化包含在 eventthub 捕获 avro 文件中的 Avro 序列化消息
问题描述
初始情况
AVRO 序列化事件被发送到 azure 事件中心。这些事件使用 azure 事件中心捕获功能永久存储。捕获的数据以及事件中心元数据以 Apache Avro 格式编写。捕获 avro 文件中包含的原始事件应使用 (py)Spark 进行分析。
问题
如何使用 (py)Spark 反序列化包含在 AVRO 文件的字段/列中的 AVRO 序列化事件?(注解:事件的 avro 模式不被阅读器应用程序知道,但它作为 avro 标头包含在消息中)
背景
背景是物联网场景的分析平台。消息由运行在 kafka 上的 IoT 平台提供。为了更灵活地更改模式,战略决策是坚持使用 avro 格式。要启用 Azure 流分析 (ASA),请为每条消息指定 avro 架构(否则 ASA 无法反序列化消息)。
捕获文件 avro 架构
事件中心捕获功能生成的 avro 文件的架构如下所列:
{
"type":"record",
"name":"EventData",
"namespace":"Microsoft.ServiceBus.Messaging",
"fields":[
{"name":"SequenceNumber","type":"long"},
{"name":"Offset","type":"string"},
{"name":"EnqueuedTimeUtc","type":"string"},
{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Body","type":["null","bytes"]}
]
}
(请注意,实际消息以字节形式存储在正文字段中)
示例事件 avro 架构
为了说明,我将具有以下 avro 模式的事件发送到事件中心:
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.test.avro",
"fields" : [
{"name" : "username","type" : "string"},
{"name" : "tweet","type" : "string"},
{"name" : "timestamp","type" : "long"}
],
}
示例事件
{
"username": "stackoverflow",
"tweet": "please help deserialize me",
"timestamp": 1366150681
}
示例 avro 消息有效负载
(编码为字符串/注意包含 avro 模式)
Objavro.schema�{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}
所以最后这个有效载荷将作为字节存储在捕获 avro 文件的“正文”字段中。
.
.
我目前的做法
为了便于使用、测试和调试,我目前使用 pyspark jupyter notebook。
Spark 会话的配置:
%%configure
{
"conf": {
"spark.jars.packages": "com.databricks:spark-avro_2.11:4.0.0"
}
}
将 avro 文件读入数据帧并输出结果:
capture_df = spark.read.format("com.databricks.spark.avro").load("[pathToCaptureAvroFile]")
capture_df.show()
结果:
+--------------+------+--------------------+----------------+----------+--------------------+
|SequenceNumber|Offset| EnqueuedTimeUtc|SystemProperties|Properties| Body|
+--------------+------+--------------------+----------------+----------+--------------------+
| 71| 9936|11/4/2018 4:59:54 PM| Map()| Map()|[4F 62 6A 01 02 1...|
| 72| 10448|11/4/2018 5:00:01 PM| Map()| Map()|[4F 62 6A 01 02 1...|
获取 Body 字段的内容并将其转换为字符串:
msgRdd = capture_df.select(capture_df.Body.cast("string")).rdd.map(lambda x: x[0])
这就是我让代码工作的程度。花了很多时间尝试反序列化实际消息,但没有成功。我将不胜感激任何帮助!
一些附加信息:Spark 在 Microsoft Azure HDInsight 3.6 集群上运行。Spark 版本是 2.2。Python 版本是 2.7.12。
解决方案
您要做的是应用于.decode('utf-8')
Body 列中的每个元素。您必须从解码创建UDF ,以便您可以应用它。UDF 可以用
from pyspark.sql import functions as f
decodeElements = f.udf(lambda a: a.decode('utf-8'))
以下是将 IoT 中心存储的 avro 文件解析为自定义 Blob 存储端点的完整示例:
storage_account_name = "<YOUR STORACE ACCOUNT NAME>"
storage_account_access_key = "<YOUR STORAGE ACCOUNT KEY>"
# Read all files from one day. All PartitionIds are included.
file_location = "wasbs://<CONTAINER>@"+storage_account_name+".blob.core.windows.net/<IoT Hub Name>/*/2018/11/30/*/*"
file_type = "avro"
# Read raw data
spark.conf.set(
"fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
storage_account_access_key)
reader = spark.read.format(file_type).option("inferSchema", "true")
raw = reader.load(file_location)
# Decode Body into strings
from pyspark.sql import functions as f
decodeElements = f.udf(lambda a: a.decode('utf-8'))
jsons = raw.select(
raw['EnqueuedTimeUtc'],
raw['SystemProperties.connectionDeviceId'].alias('DeviceId'),
decodeElements(raw['Body']).alias("Json")
)
# Parse Json data
from pyspark.sql.functions import from_json
json_schema = spark.read.json(jsons.rdd.map(lambda row: row.Json)).schema
data = jsons.withColumn('Parsed', from_json('Json', json_schema)).drop('Json')
Disclamer:我对 Python 和 Databricks 都是新手,我的解决方案可能并不完美。但是我花了一天多的时间来完成这项工作,我希望这对某人来说是一个很好的起点。
推荐阅读
- neo4j - 我正在尝试将 json 文件导入 neo4j,
- spring - 如何在 SimpleJDBC 中注入数据源
- php - 如何将 jquery 变量从一个 php 文件中获取到另一个?
- laravel - 在 laravel 5.6 中我的刀片视图有一个未定义的索引问题
- python - 用于解码文本信息的字典
- python - 如何在邮递员中读取上传的excel文件?
- javascript - 为什么将 Math.round 输入乘以 1000 然后除以结果?
- c - 将直到EOF的行读入C中的指针
- vba - vba 代码适用于一张工作表,但不适用于第二张工作表
- vba - 使用 ListObject 列引用对表进行排序