python - 如何使用来自另一个数据帧的新值更新 pyspark 数据帧?
问题描述
我有两个火花数据框:
数据框A:
|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |
和数据框B:
|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |
数据框 B 可以包含来自数据框 A 的重复、更新和新行。我想在 spark 中编写一个操作,我可以在其中创建一个新数据框,其中包含来自数据框 A 的行以及来自数据框 B 的更新行和新行。
我首先创建了一个仅包含不可更新列的哈希列。这是唯一的标识。col1
因此,假设col2
可以更改值(可以更新),但是col3,..,coln
是唯一的。我创建了一个哈希函数hash(col3,..,coln)
:
A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))
现在我想编写一些 spark 代码,基本上从 B 中选择散列不在 A 中的行(所以新行和更新的行)并将它们与来自 A 的行一起加入一个新的数据帧中。我怎样才能做到这一点派斯帕克?
编辑:数据框 B 可以有来自数据框 A 的额外列,因此无法进行联合。
示例示例
数据框A:
+-----+-----+
|col_1|col_2|
+-----+-----+
| a| www|
| b| eee|
| c| rrr|
+-----+-----+
数据框 B:
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
| a| wew| 1|
| d| yyy| 2|
| c| rer| 3|
+-----+-----+-----+
结果:数据框 C:
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
| a| wew| 1|
| b| eee| null|
| c| rer| 3|
| d| yyy| 2|
+-----+-----+-----+
解决方案
这与使用新值更新数据框列密切相关,除了您还想从 DataFrame B 添加行。一种方法是首先执行链接问题中概述的操作,然后将结果与 DataFrame B 合并并删除重复。
例如:
dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
.select(
'col_1',
f.when(
~f.isnull(f.col('b.col_2')),
f.col('b.col_2')
).otherwise(f.col('a.col_2')).alias('col_2'),
'b.col_3'
)\
.union(dfB)\
.dropDuplicates()\
.sort('col_1')\
.show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#| a| wew| 1|
#| b| eee| null|
#| c| rer| 3|
#| d| yyy| 2|
#+-----+-----+-----+
或者如果您有很多列要替换并且您不想对它们全部进行硬编码,则更一般地使用列表推导:
cols_to_update = ['col_2']
dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
.select(
*[
['col_1'] +
[
f.when(
~f.isnull(f.col('b.{}'.format(c))),
f.col('b.{}'.format(c))
).otherwise(f.col('a.{}'.format(c))).alias(c)
for c in cols_to_update
] +
['b.col_3']
]
)\
.union(dfB)\
.dropDuplicates()\
.sort('col_1')\
.show()