首页 > 解决方案 > Spark Structured Streaming foreachBatch 和 UPSERT(合并):坚持还是不坚持?

问题描述

如果使用 foreachBatch 在结构化流中进行状态聚合(任意)以将更新合并到增量表中,我是否应该在更新插入之前将批处理数据帧保留在 foreachBatch 中?

似乎不需要持久化,因为我正在写入单个数据接收器。

另一方面,我有强烈的感觉,不坚持会导致源重新扫描并触发聚合两次。

有什么意见/想法吗?

foreachBatch((VoidFunction2<Dataset<Row>, Long>) (batchDf, batchId) -> 
        deltaTable.as("table").merge(batchDf.as("updates"), functions.expr("table.id=updates.id"))
        .whenNotMatched().insertAll()           // new session to be added
        .whenMatched()
        .updateAll()
        .execute())

标签: apache-sparkapache-spark-sqlspark-structured-streaming

解决方案


所以 delta-users ( https://groups.google.com/g/delta-users/c/Ihm6PMilCdI ) 的答案是:

DeltaTable.merge (upsert) 对源数据进行两次传递。

因此,如果您确实关心 Arbitrary Stateful Aggregation 中的 Spark 指标或日志mapGroupsWithState/ flatmapGroupsWithState- 在 merge inside 之前执行持久化/缓存foreachBatch,否则发送的指标将具有双 (x2) 值,并且日志聚合日志将被发出两次


推荐阅读