azure - 使用 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中的 Databricks 读取 avro 数据失败
问题描述
我正在尝试从 Azure Data Lake Gen1 读取 avro 数据,这些数据是从 Azure EventHubs 生成的,在 Azure Databricks 中使用 pyspark 启用了 Azure Event Hubs Capture:
inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)
以下语句失败
rawData.count()
和
org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file
EventHub-Capture 是否写入非 Avro 数据?是否有使用 Spark 读取 EventHub 捕获数据的最佳实践?
解决方案
实现冷摄取路径的一种模式是使用事件中心捕获。EventHubs 捕获按照窗口参数的定义为每个分区写入一个文件。数据以 avro 格式编写,可以使用 Apache Spark 进行分析。
那么使用此功能的最佳实践是什么?
1.不要过度分区
我经常看到人们使用默认配置,这最终经常导致许多小文件。如果要使用 Spark 使用通过 EventHubs Capture 摄取的数据,请记住Azure Data Lake Store 中文件大小的最佳做法以及使用 Spark 的分区。文件大小应为 ~256 MB,分区大小应在 10 到 50 GB 之间。因此,最终配置取决于您正在使用的消息的数量和大小。在大多数情况下,您只需按摄取日期对数据进行分区就可以了。
2.勾选“不发出空文件选项”
您应该检查“不发出空文件选项”。如果你想用 Spark 消费数据,节省不必要的文件操作。
3. 在文件路径中使用数据源
使用流式架构,您的 EventHub 就像在面向批处理的架构方法中的着陆区一样。因此,您将在原始数据层中摄取数据。好的做法是在目录路径中使用数据源而不是 EventHub 的名称。因此,例如,如果您从工厂中的机器人获取遥测数据,这可能是目录路径/raw/robots/
存储命名需要使用 {Namesapce}、{PartitionId} 等所有属性。因此,最终,具有明确定义的路径、每日分区和使用 Azure Data Lake Gen 2 中文件名的剩余属性的良好捕获文件格式定义可能如下所示:
/raw/robots/ingest_date={Year}-{Month}-{Day}/{Hour}{Minute}{Second}-{Namespace}-{EventHub}-{PartitionId}
4. 考虑一个压实工作
捕获的数据未压缩,并且在您的用例中也可能最终变成小文件(因为最小写入频率为 15 分钟)。因此,如有必要,请编写一个每天运行一次的压缩作业。就像是
df.repartition(5).write.format("avro").save(targetpath)
会做这项工作。
那么现在读取捕获数据的最佳实践是什么?
5.忽略读取数据的非avro文件
Azure EventHubs Capture 将临时数据写入 Azure Data Lake Gen1。最佳做法是仅使用 avro-extension 读取数据。您可以通过 spark 配置轻松实现此目的:
spark.conf.set("avro.mapred.ignore.inputs.without.extension", "true")
6.只读相关分区
考虑只读取相关分区,例如过滤当前摄取日期。
7. 使用共享元数据
读取捕获的数据与直接从 Azure EventHubs 读取数据类似。所以你必须有一个模式。假设您还有使用 Spark 结构化流直接读取数据的作业,一个好的模式是存储元数据并共享它。您可以将此元数据存储在 Data Lake Store json 文件中:
[{"MeasurementTS":"timestamp","Location":"string", "Temperature":"double"}]
并用这个简单的解析函数阅读它:
# parse the metadata to get the schema
from collections import OrderedDict
from pyspark.sql.types import *
import json
ds = dbutils.fs.head (metadata) # read metadata file
items = (json
.JSONDecoder(object_pairs_hook=OrderedDict)
.decode(ds)[0].items())
#Schema mapping
mapping = {"string": StringType, "integer": IntegerType, "double" : DoubleType, "timestamp" : TimestampType, "boolean" : BooleanType}
schema = StructType([
StructField(k, mapping.get(v.lower())(), True) for (k, v) in items])
所以你可以重用你的模式:
from pyspark.sql.functions import *
parsedData = spark.read.format("avro").load(rawpath). \
selectExpr("EnqueuedTimeUtc", "cast(Body as string) as json") \
.select("EnqueuedTimeUtc", from_json("json", schema=Schema).alias("data")) \
.select("EnqueuedTimeUtc", "data.*")
推荐阅读
- c# - 仅当表稳定时,一旦 SQL Server 表上的数据发生更改,如何在 C# 中触发函数?
- java - 为什么这个 while 循环条件不停止循环?
- python - Pandas Ordered Categorical 未按预期工作
- mysql - 如何在mysql中获取表的转置
- r - 在 R 中,在字符串中,有没有办法为正则表达式中被视为特殊的字符添加双反斜杠
- python - Python - 保存到 PDF 屏幕未在 chrome 85.0.4183.102 中显示
- c++ - 无论如何,如果条件与 C++ 模板一起链接多个?
- python - 类型错误:__init__() 缺少 1 个必需的位置参数:“单位”'
- python - 以下用于求幂的分治递归算法是否比用于大数的迭代算法更有效?
- php - 在 Laravel 集合上混合数组