apache-spark - 如何使用 pyspark 更新大型 SQL 表?
问题描述
我正在使用pyspark
in构建一个 etl databricks
。
我有一个源 SQL 表,其中包含大约 1000 万行数据,我想将其加载到 SQL 临时表中。
我有两个基本要求:-
- 当一行被添加到源表时,它必须插入到临时表中。
- 当一行更新到源表时,它必须更新到临时表中。
源数据
值得庆幸的是,源表有两个时间戳列,用于创建和更新时间。我可以使用这两列查询新的和更新的数据,并将其放入一个dataframe
名为 source_df 的文件中。
目标数据
我将暂存表中的所有键(ID)加载到一个dataframe
名为 target_df 的文件中。
制定改变
我根据键将两者dataframe
结合在一起,以确定哪些行已经存在(哪些表单更新),哪些行不存在(哪些表单插入)。这给了我两个新dataframes
的 inserts_df 和 updates_df。
插入新行
这很容易,因为我可以使用 inserts_df.write 直接写入临时表。任务完成!
更新现有行
这是我无法弄清楚的,因为现有示例的方式很少。我相信你不能使用pyspark
. 我可以使用“覆盖”模式来替换 SQL 表,但是当我只想更新半打时,替换 1000 万行并没有多大意义。
如何在不覆盖整个表的情况下有效地将 updates_df 中的行获取到 SQL 中?
解决方案
推荐阅读
- kubernetes - 如何找到我的持久卷位置
- oracle - 在生产中修复 Oracle 查询性能的最佳方法是什么?更改应用程序代码?使用计划基线?
- highcharts - 如何在highcharts地图上将区域(城市)划分为子区域(子城市)
- python-3.x - 使用 f 字符串固定所有列表值的小数点后的数字或使用 f 字符串的漂亮打印列表
- javascript - 有没有办法用分隔符或类似于正则表达式的东西将文本文件上传到谷歌表格来填充单元格
- redux - 如何使用基于静态过滤值的重新选择 redux 生成过滤列表?
- swift - Firebase 登录未运行
- c++ - 我无法理解该程序中缓冲区之间的相关性
- java - Jaxb Unmarshall SOAP 信封
- python - 制作虚拟环境时如何修复“IOError:[Errno 2] No such file or directory”