首页 > 解决方案 > 如何使用 pyspark 更新大型 SQL 表?

问题描述

我正在使用pysparkin构建一个 etl databricks

我有一个源 SQL 表,其中包含大约 1000 万行数据,我想将其加载到 SQL 临时表中。

我有两个基本要求:-

  1. 当一行被添加到源表时,它必须插入到临时表中。
  2. 当一行更新到源表时,它必须更新到临时表中。

源数据

值得庆幸的是,源表有两个时间戳列,用于创建和更新时间。我可以使用这两列查询新的和更新的数据,并将其放入一个dataframe名为 source_df 的文件中。

目标数据

我将暂存表中的所有键(ID)加载到一个dataframe名为 target_df 的文件中。

制定改变

我根据键将两者dataframe结合在一起,以确定哪些行已经存在(哪些表单更新),哪些行不存在(哪些表单插入)。这给了我两个新dataframes的 inserts_df 和 updates_df。

插入新行

这很容易,因为我可以使用 inserts_df.write 直接写入临时表。任务完成!

更新现有行

这是我无法弄清楚的,因为现有示例的方式很少。我相信你不能使用pyspark. 我可以使用“覆盖”模式来替换 SQL 表,但是当我只想更新半打时,替换 1000 万行并没有多大意义。

如何在不覆盖整个表的情况下有效地将 updates_df 中的行获取到 SQL 中?

标签: apache-sparkpysparkapache-spark-sqldatabricks

解决方案


推荐阅读