首页 > 解决方案 > 每天使用 Spark 结构化流将 Azure EventHub 数据流式传输到 ADLS/Blob

问题描述

我需要每天将事件从 Azure EventHub(EH) 流式传输到 blob。我已经使用带有 Trigger.once() 选项的 Spark 结构化流以及 spark 检查点来实现这一点,这项工作非常好。但是我需要进一步的是,当我每天使用 Trigger.once() 选项运行 Spark 作业时,我需要每天在 blob 中创建一个文件夹(“YYYY-MM-DD”)并转储所有事件/特定日期在 EH 中可用的数据。

为了实现这一点,我使用了 EH 模式中可用的 column='enqueuedTime',如下所示,并在将数据写入 blob 时使用了 partitionBy。

withColumn("year", year(to_date($"enqueuedTime", "MM/dd/yyyy"))).withColumn("month", month(to_date($"enqueuedTime", "MM/dd/yyyy")))

batchSignalsFinalDF
  .writeStream
  .format("JSON")
  .partitionBy("year", "month", "day")
  .option("checkpointLocation", "PATH IN BLOB")
  .outputMode(OutputMode.Append).trigger(Trigger.Once())
  .start("PATH IN BLOB")

这似乎不适用于我的一个用例,是否有更好的方法来实现这一点!,任何建议都将不胜感激。

标签: apache-sparkspark-structured-streamingazure-eventhubazure-blob-storage

解决方案


推荐阅读