apache-spark - 每天使用 Spark 结构化流将 Azure EventHub 数据流式传输到 ADLS/Blob
问题描述
我需要每天将事件从 Azure EventHub(EH) 流式传输到 blob。我已经使用带有 Trigger.once() 选项的 Spark 结构化流以及 spark 检查点来实现这一点,这项工作非常好。但是我需要进一步的是,当我每天使用 Trigger.once() 选项运行 Spark 作业时,我需要每天在 blob 中创建一个文件夹(“YYYY-MM-DD”)并转储所有事件/特定日期在 EH 中可用的数据。
为了实现这一点,我使用了 EH 模式中可用的 column='enqueuedTime',如下所示,并在将数据写入 blob 时使用了 partitionBy。
withColumn("year", year(to_date($"enqueuedTime", "MM/dd/yyyy"))).withColumn("month", month(to_date($"enqueuedTime", "MM/dd/yyyy")))
batchSignalsFinalDF
.writeStream
.format("JSON")
.partitionBy("year", "month", "day")
.option("checkpointLocation", "PATH IN BLOB")
.outputMode(OutputMode.Append).trigger(Trigger.Once())
.start("PATH IN BLOB")
这似乎不适用于我的一个用例,是否有更好的方法来实现这一点!,任何建议都将不胜感激。
解决方案
推荐阅读
- javascript - 如何从 HTML 元素中获取换行符
- dji-sdk - 是否可以从其他 iOS 应用程序打开 DJI GO?
- python - 如何将高斯矩阵转换为概率矩阵
- c# - 匹配“Double, Double, Double, Double”字符串
- javascript - 从对象 JS 中检索值
- c# - 使用 Include 时,Entity Framework Core 未加载实体
- drools - 流口水开发和生产分离很好
- kdb - 如何将数组传递到更新查询 kdb
- javascript - Angular 2 OnPush 检测策略
- jquery - jQuery SlideUp 和 SlideDown 无法正常工作