首页 > 解决方案 > Spark Structured Streaming 给我错误,因为 org.apache.spark.sql.AnalysisException: 'foreachBatch' 不支持分区;

问题描述

我在 Databricks 中设计了以下结构化流代码以写入 Azure Data Lake:

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {


  microBatchOutputDF.createOrReplaceTempView("updates")


  microBatchOutputDF.sparkSession.sql(s"""
   MERGE INTO silver as r
USING 
(
SELECT smtUidNr, dcl, inv, evt, smt, msgTs,msgInfSrcCd
FROM (
  SELECT smtUidNr, msgTs
  , RANK() OVER (PARTITION BY smtUidNr ORDER BY msgTs DESC) as rank
  , ROW_NUMBER() OVER (PARTITION BY smtUidNr ORDER BY msgTs DESC) as row_num
  FROM updates
  )
WHERE rank = 1 AND row_num = 1
)
as u
ON u.smtUidNr = r.smtUidNr 
WHEN MATCHED and u.msgTs > r.msgTs THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
  """)
}

splitDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("append").partitionBy("year","month","day").option("checkpointLocation", "abfss://checkpoint@mcfdatalake.dfs.core.windows.net/kjd/test/").start("abfss://dump@mcfdatalake.dfs.core.windows.net/main_data/")

当我尝试执行此操作时,它给了我如下错误:

org.apache.spark.sql.AnalysisException: 'foreachBatch' does not support partitioning;

将 foreachBatch 与分区一起使用的替代方法是什么?

标签: apache-sparkdatabricksspark-structured-streamingazure-databricks

解决方案


将 foreachBatch 与分区一起使用的替代方法是什么?

在内部使用分区foreachBatch

您还可以将批处理写入 Delta 表并在 delta 表上运行单独的查询以将其与另一个表合并。


推荐阅读