首页 > 解决方案 > 如何使用来自另一个数据帧的新值更新 pyspark 数据帧?

问题描述

我有两个火花数据框:

数据框A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

和数据框B:

|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |

数据框 B 可以包含来自数据框 A 的重复、更新和新行。我想在 spark 中编写一个操作,我可以在其中创建一个新数据框,其中包含来自数据框 A 的行以及来自数据框 B 的更新行和新行。

我首先创建了一个仅包含不可更新列的哈希列。这是唯一的标识。col1因此,假设col2可以更改值(可以更新),但是col3,..,coln是唯一的。我创建了一个哈希函数hash(col3,..,coln)

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

现在我想编写一些 spark 代码,基本上从 B 中选择散列不在 A 中的行(所以新行和更新的行)并将它们与来自 A 的行一起加入一个新的数据帧中。我怎样才能做到这一点派斯帕克?

编辑:数据框 B 可以有来自数据框 A 的额外列,因此无法进行联合。

示例示例

数据框A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

数据框 B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

结果:数据框 C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+

标签: pythonpyspark

解决方案


这与使用新值更新数据框列密切相关,除了您还想从 DataFrame B 添加行。一种方法是首先执行链接问题中概述的操作,然后将结果与 DataFrame B 合并并删除重复。

例如:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        'col_1',
        f.when(
            ~f.isnull(f.col('b.col_2')),
            f.col('b.col_2')
        ).otherwise(f.col('a.col_2')).alias('col_2'),
        'b.col_3'
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#|    a|  wew|    1|
#|    b|  eee| null|
#|    c|  rer|    3|
#|    d|  yyy|    2|
#+-----+-----+-----+

或者如果您有很多列要替换并且您不想对它们全部进行硬编码,则更一般地使用列表推导:

cols_to_update = ['col_2']

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        *[
            ['col_1'] + 
            [
                f.when(
                    ~f.isnull(f.col('b.{}'.format(c))),
                    f.col('b.{}'.format(c))
                ).otherwise(f.col('a.{}'.format(c))).alias(c)
                for c in cols_to_update
            ] + 
            ['b.col_3']
        ]
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()

推荐阅读