apache-spark - Spark Structured Streaming foreachBatch 和 UPSERT(合并):坚持还是不坚持?
问题描述
如果使用 foreachBatch 在结构化流中进行状态聚合(任意)以将更新合并到增量表中,我是否应该在更新插入之前将批处理数据帧保留在 foreachBatch 中?
似乎不需要持久化,因为我正在写入单个数据接收器。
另一方面,我有强烈的感觉,不坚持会导致源重新扫描并触发聚合两次。
有什么意见/想法吗?
foreachBatch((VoidFunction2<Dataset<Row>, Long>) (batchDf, batchId) ->
deltaTable.as("table").merge(batchDf.as("updates"), functions.expr("table.id=updates.id"))
.whenNotMatched().insertAll() // new session to be added
.whenMatched()
.updateAll()
.execute())
解决方案
所以 delta-users ( https://groups.google.com/g/delta-users/c/Ihm6PMilCdI ) 的答案是:
DeltaTable.merge (upsert) 对源数据进行两次传递。
因此,如果您确实关心 Arbitrary Stateful Aggregation 中的 Spark 指标或日志mapGroupsWithState
/ flatmapGroupsWithState
- 在 merge inside 之前执行持久化/缓存foreachBatch
,否则发送的指标将具有双 (x2) 值,并且日志聚合日志将被发出两次
推荐阅读
- c - 为什么我们不能将地址存储在普通的 int 变量中?并通过分配我不希望它指向任何地方。
- database - Array handling in mongodb
- r - 显示每个方面而不是整体的摘要行
- tizen - 无法在 Tizen Studio 中下载新图像和创建模拟器
- c# - (&(objectClass=user)(|(displayName)) 搜索过滤器无效
- r - 如何使用 R 中的 WOE 和 IV 替换分类或连续变量中的 NA?
- c++ - Spirit 仅在似乎从 Lexer 获得第一个符号后无法解析
- java - 如何从 Android 更新 DynamoDB 中的项目?
- hadoop - 如何在启用 kerberos 的 HDFS 中创建非超级用户
- c++ - clang-7:错误:链接器命令失败,退出代码为 1