首页 > 解决方案 > 如何使用 Databricks 集群(Scala)将数据从 Eventhub 摄取到 ADLS

问题描述

我想以指定格式将流数据从 Eventhub 摄取到 ADLS gen2。

我做了从 DB 到 ADLS 和 Container 到 Container 的批量数据摄取,但现在我想尝试流式数据摄取。

请您指导我从哪里开始进行下一步。我确实在 Azure 中创建了 Eventhub、Databrick 实例和存储帐户。

标签: apache-sparkazure-databricksspark-structured-streamingazure-eventhubazure-data-lake-gen2

解决方案


您只需遵循 EventHubs Spark 连接器的文档(针对 ScalaPython)。以最简单的方式,代码如下所示(对于 Python):

readConnectionString = "..."
ehConf = {}
# this is required for versions 2.3.15+
ehConf['eventhubs.connectionString']=sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(readConnectionString)

df = spark.readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

# casting binary payload to String (but it's really depends on the 
# data format inside the topic)
cdf = df.withColumn("body", F.col("body").cast("string"))

# write data to storage
stream = cdf.writeStream.format("delta")\
  .outputMode("append")\
  .option("checkpointLocation", "/path/to/checkpoint/directory")\
  .start("ADLS location")

您可能需要添加其他选项,例如起始位置等,但文档中对所有内容都有很好的描述


推荐阅读