apache-spark - 如何使用 Databricks 集群(Scala)将数据从 Eventhub 摄取到 ADLS
问题描述
我想以指定格式将流数据从 Eventhub 摄取到 ADLS gen2。
我做了从 DB 到 ADLS 和 Container 到 Container 的批量数据摄取,但现在我想尝试流式数据摄取。
请您指导我从哪里开始进行下一步。我确实在 Azure 中创建了 Eventhub、Databrick 实例和存储帐户。
解决方案
您只需遵循 EventHubs Spark 连接器的文档(针对 Scala和Python)。以最简单的方式,代码如下所示(对于 Python):
readConnectionString = "..."
ehConf = {}
# this is required for versions 2.3.15+
ehConf['eventhubs.connectionString']=sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(readConnectionString)
df = spark.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
# casting binary payload to String (but it's really depends on the
# data format inside the topic)
cdf = df.withColumn("body", F.col("body").cast("string"))
# write data to storage
stream = cdf.writeStream.format("delta")\
.outputMode("append")\
.option("checkpointLocation", "/path/to/checkpoint/directory")\
.start("ADLS location")
您可能需要添加其他选项,例如起始位置等,但文档中对所有内容都有很好的描述。
推荐阅读
- batch-file - 如何使用 cmd 从文本文件中插入密码?
- matlab - 重写脚本并将“内核”分布图拟合到直方图中
- apache-camel - Camel:是否可以在给定时间使用计时器组件来安排日常任务?
- angular - __importDefault 未定义 Ionic
- linux - tmux pin 以避免滚动
- c++ - 如何在linux ubuntu下链接犰狳给出链接错误
- python-2.7 - 如何在 Python 中从 .txt 连接?
- c# - 正确处理变量构造函数参数
- ios - 发送群组短信 iOS Swift - 如何超越运营商每次发送尝试 9 - 20 条消息的限制
- python - Python虚拟环境`venv`找不到`sqlite3`模块