首页 > 解决方案 > 从大数据集中删除模糊重复

问题描述

我有一个包含许多记录的 CSV 文件。每条记录代表某个人。它具有first_namelast_name和其他详细信息。我的目标是以聪明的方式从这些数据中删除重复项。记录来自不同的来源,副本可能包含不同的信息。我在下面的示例中简化了问题:

df = spark.createDataFrame(
    [
        (1, "Anna", "Smyth", "01/03", "NY"), 
        (2, "Anna", "Smyth", "01/03", ""), 
        (3, "Anna", "Smyth", "01/03", "NY"),         
        (4, "Max", "Anderson", "12/04", "Boston"), 
        (5, "Max", "Anderson", "", "London"), 
        (6, "Max", "Anderson", "06/07", ""),         
        (7, "Sarah", "Nicolson", "02/09", ""), 
        (8, "Sarah", "Jonson", "", "Mexico"), 
        (9, "Sarah", "Jonson", "01/08", "Dallas"), 
    ],
    ("id", "first_name", "last_name", "birthday", "city")
)
df.show()

+---+----------+---------+--------+------+
| id|first_name|last_name|birthday|  city|
+---+----------+---------+--------+------+
|  1|      Anna|    Smyth|   01/03|    NY|
|  2|      Anna|    Smyth|   01/03|      |
|  3|      Anna|    Smyth|   01/03|    NY|
|  4|       Max| Anderson|   12/04|Boston|
|  5|       Max| Anderson|        |London|
|  6|       Max| Anderson|   06/07|      |
|  7|     Sarah| Nicolson|   02/09|      |
|  8|     Sarah|   Jonson|        |Mexico|
|  9|     Sarah|   Jonson|   01/08|Dallas|
+---+----------+---------+--------+------+

我想按first_nameand对记录进行分组last_name,然后进行一些比较以考虑记录是否重复。如果很少有记录具有相同的first_nameand last_name,我想检查birthdayand,如果它等于 - 它是重复的。如果一个或记录已birthday填充,另一个 - 不是,它是重复的。如果两个(或更多)记录都为空birthday- 它是重复的。相比之下,我忽略city了字段,但是在考虑重复项时,我想留下“最富有”的记录,即填充了更多字段的记录。如果记录具有相同的名称但不同的生日 - 它不是重复的。

例如,上面,我想得到:

+---+----------+---------+--------+------+
| id|first_name|last_name|birthday|  city|
+---+----------+---------+--------+------+
|  1|      Anna|    Smyth|   01/03|    NY|
|  4|       Max| Anderson|   12/04|Boston|
|  6|       Max| Anderson|   06/07|      |
|  7|     Sarah| Nicolson|   02/09|      |
|  9|     Sarah|   Jonson|   01/08|Dallas|
+---+----------+---------+--------+------+

在真正的问题中,我有更多的字段 - 大约 70 个,其中一些应该是必须匹配的,有些 - 不是。我需要处理的记录数量 - 大约 1 亿条。我正在考虑使用 pyspark,但欢迎使用任何技术

标签: dataframeapache-sparkpysparkapache-spark-sqlpyspark-dataframes

解决方案


首先,您可以通过连接由分区 [ 中的 max 替换的列来为每一行创建组列,如果grp它是null 或空字符串:first_name#last_name#birthdaybirthdayfirst_namelast_name]

from pyspark.sql import Window
from pyspark.sql import functions as F

w1 = Window.partitionBy("first_name", "last_name").orderBy()

df1 = df.withColumn(
    "grp",
    F.concat_ws(
        "#",
        "first_name",
        "last_name",
        F.coalesce(F.expr("nullif(birthday, '')"), F.max("birthday").over(w1))
    )
).withColumn(
    "rich_columns",
    F.array(
        *[F.col(c) for c in df.columns if c not in ["id", "first_name", "last_name"]]
    )
)

df1.show(truncate=False)
#+---+----------+---------+--------+------+--------------------+---------------+
#|id |first_name|last_name|birthday|city  |grp                 |rich_columns   |
#+---+----------+---------+--------+------+--------------------+---------------+
#|1  |Anna      |Smyth    |01/03   |NY    |Anna#Smyth#01/03    |[01/03, NY]    |
#|2  |Anna      |Smyth    |01/03   |      |Anna#Smyth#01/03    |[01/03, ]      |
#|3  |Anna      |Smyth    |01/03   |NY    |Anna#Smyth#01/03    |[01/03, NY]    |
#|7  |Sarah     |Nicolson |02/09   |      |Sarah#Nicolson#02/09|[02/09, ]      |
#|8  |Sarah     |Jonson   |        |Mexico|Sarah#Jonson#01/08  |[, Mexico]     |
#|9  |Sarah     |Jonson   |01/08   |Dallas|Sarah#Jonson#01/08  |[01/08, Dallas]|
#|4  |Max       |Anderson |12/04   |Boston|Max#Anderson#12/04  |[12/04, Boston]|
#|5  |Max       |Anderson |        |London|Max#Anderson#12/04  |[, London]     |
#|6  |Max       |Anderson |06/07   |      |Max#Anderson#06/07  |[06/07, ]      |
#+---+----------+---------+--------+------+--------------------+---------------+

birthdaycity还用于创建一个数组列rich_columns,该列将用于对最大信息不为空/空的行进行优先级排序。

然后,使用上面创建的组列执行行号并按rich_columns数组大小排序:

w2 = Window.partitionBy("grp").orderBy(
    F.expr("size(filter(rich_columns, x -> nullif(x, '') is not null))").desc()
)

df2 = df1.withColumn("rn", F.row_number().over(w2)) \
    .filter("rn = 1") \
    .drop("grp", "rn", "rich_columns")

df2.show()

#+---+----------+---------+--------+------+
#| id|first_name|last_name|birthday|  city|
#+---+----------+---------+--------+------+
#|  7|     Sarah| Nicolson|   02/09|      |
#|  1|      Anna|    Smyth|   01/03|    NY|
#|  9|     Sarah|   Jonson|   01/08|Dallas|
#|  6|       Max| Anderson|   06/07|      |
#|  4|       Max| Anderson|   12/04|Boston|
#+---+----------+---------+--------+------+

在实际应用程序中,您应该在执行此操作之前将 columns last_name、转换first_name为大写并去除重音符号(如果有)。


推荐阅读