首页 > 解决方案 > 使用 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中的 Databricks 读取 avro 数据失败

问题描述

我正在尝试从 Azure Data Lake Gen1 读取 avro 数据,这些数据是从 Azure EventHubs 生成的,在 Azure Databricks 中使用 pyspark 启用了 Azure Event Hubs Capture:

inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)

以下语句失败

rawData.count()

org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file

EventHub-Capture 是否写入非 Avro 数据?是否有使用 Spark 读取 EventHub 捕获数据的最佳实践?

标签: azurepysparkazure-eventhubazure-databricksazure-eventhub-capture

解决方案


实现冷摄取路径的一种模式是使用事件中心捕获EventHubs 捕获按照窗口参数的定义为每个分区写入一个文件。数据以 avro 格式编写,可以使用 Apache Spark 进行分析。

那么使用此功能的最佳实践是什么?

1.不要过度分区

我经常看到人们使用默认配置,这最终经常导致许多小文件。如果要使用 Spark 使用通过 EventHubs Capture 摄取的数据,请记住Azure Data Lake Store 中文件大小的最佳做法以及使用 Spark 的分区。文件大小应为 ~256 MB,分区大小应在 10 到 50 GB 之间。因此,最终配置取决于您正在使用的消息的数量和大小。在大多数情况下,您只需按摄取日期对数据进行分区就可以了。

2.勾选“不发出空文件选项”

您应该检查“不发出空文件选项”。如果你想用 Spark 消费数据,节省不必要的文件操作。

3. 在文件路径中使用数据源

使用流式架构,您的 EventHub 就像在面向批处理的架构方法中的着陆区一样。因此,您将在原始数据层中摄取数据。好的做法是在目录路径中使用数据源而不是 EventHub 的名称。因此,例如,如果您从工厂中的机器人获取遥测数据,这可能是目录路径/raw/robots/

存储命名需要使用 {Namesapce}、{PartitionId} 等所有属性。因此,最终,具有明确定义的路径、每日分区和使用 Azure Data Lake Gen 2 中文件名的剩余属性的良好捕获文件格式定义可能如下所示:

 /raw/robots/ingest_date={Year}-{Month}-{Day}/{Hour}{Minute}{Second}-{Namespace}-{EventHub}-{PartitionId}

在此处输入图像描述

4. 考虑一个压实工作

捕获的数据未压缩,并且在您的用例中也可能最终变成小文件(因为最小写入频率为 15 分钟)。因此,如有必要,请编写一个每天运行一次的压缩作业。就像是

df.repartition(5).write.format("avro").save(targetpath)

会做这项工作。

那么现在读取捕获数据的最佳实践是什么?

5.忽略读取数据的非avro文件

Azure EventHubs Capture 将临时数据写入 Azure Data Lake Gen1。最佳做法是仅使用 avro-extension 读取数据。您可以通过 spark 配置轻松实现此目的:

spark.conf.set("avro.mapred.ignore.inputs.without.extension", "true")

6.只读相关分区

考虑只读取相关分区,例如过滤当前摄取日期。

7. 使用共享元数据

读取捕获的数据与直接从 Azure EventHubs 读取数据类似。所以你必须有一个模式。假设您还有使用 Spark 结构化流直接读取数据的作业,一个好的模式是存储元数据并共享它。您可以将此元数据存储在 Data Lake Store json 文件中:

[{"MeasurementTS":"timestamp","Location":"string", "Temperature":"double"}]

并用这个简单的解析函数阅读它:

# parse the metadata to get the schema
from collections import OrderedDict 
from pyspark.sql.types import *
import json

ds = dbutils.fs.head (metadata)                                                 # read metadata file

items = (json
  .JSONDecoder(object_pairs_hook=OrderedDict)
  .decode(ds)[0].items())

#Schema mapping 
mapping = {"string": StringType, "integer": IntegerType, "double" : DoubleType, "timestamp" : TimestampType, "boolean" : BooleanType}

schema = StructType([
    StructField(k, mapping.get(v.lower())(), True) for (k, v) in items])

所以你可以重用你的模式:

from pyspark.sql.functions import *

parsedData = spark.read.format("avro").load(rawpath). \
  selectExpr("EnqueuedTimeUtc", "cast(Body as string) as json") \
 .select("EnqueuedTimeUtc", from_json("json", schema=Schema).alias("data")) \
 .select("EnqueuedTimeUtc", "data.*")

推荐阅读