apache-spark - Spark Structured Streaming 给我错误,因为 org.apache.spark.sql.AnalysisException: 'foreachBatch' 不支持分区;
问题描述
我在 Databricks 中设计了以下结构化流代码以写入 Azure Data Lake:
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF.createOrReplaceTempView("updates")
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO silver as r
USING
(
SELECT smtUidNr, dcl, inv, evt, smt, msgTs,msgInfSrcCd
FROM (
SELECT smtUidNr, msgTs
, RANK() OVER (PARTITION BY smtUidNr ORDER BY msgTs DESC) as rank
, ROW_NUMBER() OVER (PARTITION BY smtUidNr ORDER BY msgTs DESC) as row_num
FROM updates
)
WHERE rank = 1 AND row_num = 1
)
as u
ON u.smtUidNr = r.smtUidNr
WHEN MATCHED and u.msgTs > r.msgTs THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")
}
splitDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("append").partitionBy("year","month","day").option("checkpointLocation", "abfss://checkpoint@mcfdatalake.dfs.core.windows.net/kjd/test/").start("abfss://dump@mcfdatalake.dfs.core.windows.net/main_data/")
当我尝试执行此操作时,它给了我如下错误:
org.apache.spark.sql.AnalysisException: 'foreachBatch' does not support partitioning;
将 foreachBatch 与分区一起使用的替代方法是什么?
解决方案
将 foreachBatch 与分区一起使用的替代方法是什么?
在内部使用分区foreachBatch
。
您还可以将批处理写入 Delta 表并在 delta 表上运行单独的查询以将其与另一个表合并。
推荐阅读
- flutter - Flutter Widget Inspector 不断显示 vscode 中的负载
- sql - How to add auto increment id according to a group in VFP 9 and SQL Server
- node.js - 使用对象原型方法应用节点模块
- linker-errors - 在 linux 64 位中使用 delphi 链接单独的共享对象库时不支持 ELF 文件类型 2
- jmeter - 如何在网页测试中将事件合并到一个事务名称下?
- python - Web Scraping, API, HTTPError: HTTP Error 522: Origin Connection Time-out
- teamcity - TeamCity:如何设置正确的触发器文件通配符
- excel - 使用 Excel IF 函数自动更新连接工作表中的值
- javascript - 量角器 - 更新 chrome 版本后获取返回 null 的属性值
- python - 将 groupbys 附加到嵌套字典