首页 > 解决方案 > 根据 PySpark Dataframe 中两列之间的匹配分配唯一 ID

问题描述

我想为数据框中的列分配一个自动增量唯一 ID。

如果 column1 值与 column2 值匹配,则标志将启用为 true,并且对于所有这些匹配,我们需要分配相同的 ID。如果 column1 值与 column2 值不匹配,则标志将为 False,我们需要为该 column1 值提供唯一 ID。

输入df

ID 第 1 列 第 2 列 旗帜
空值 1 2 真的
空值 1 3 真的
空值 2 1 真的
空值 2 3 真的
空值 3 1 真的
空值 3 2 真的
空值 4 错误的
空值 5 错误的
空值 6 7 真的
空值 7 6 真的
空值 9 2 真的
空值 1 9 真的
空值 3 9 真的
空值 2 9 真的
空值 8 错误的

输出df

这里 column1 值 1、2、3 和 9 形成匹配,因此我们为所有这 4 个值(101)分配一个唯一 ID,column1 值 4 不匹配,因此我们分配下一个唯一 ID(102),column1 值 5 是也不匹配,所以我们分配下一个唯一 ID(103),column1 值 6 和 7 是匹配的,所以我们为 2 个值(104)分配相同的唯一 ID,column1 值 8 与 column2 值中的任何一个都不匹配,所以我们分配下一个唯一 ID 105

ID 第 1 列
101 1
101 2
101 3
102 4
103 5
104 6
104 7
101 9
105 8

标签: apache-sparkpysparkapache-spark-sql

解决方案


我把我的解释放在代码中。请注意在Window不分区数据的情况下使用,此操作会将您的所有数据移动到单个节点。

from pyspark.sql.window import Window
import pyspark.sql.functions as f

# [...] Your dataframe initialization

# Creating an index to retrieve original dataframe at the end
df = df.withColumn('index', f.monotonically_increasing_id())

w = Window.orderBy('least')

# Creating a column with least value from `Column1` and `Column2`. This will be used to "group" the values that must have the same ID
df = df.withColumn('least', f.least(f.col('Column1'), f.col('Column2')))

# Check if the current or previous `flag` is false to increase the id
df = df.withColumn('increase', ((~f.col('flag')) | (~f.lag('flag', default=True).over(w))).cast('int'))

# Generating incremental id
df = df.withColumn('ID', f.lit(101) + f.sum('increase').over(w))

(df
 .select('ID', 'Column1')
 .drop_duplicates()
 .sort('index')
 .show(truncate=False))

输出

+---+-------+
|ID |Column1|
+---+-------+
|101|1      |
|101|2      |
|101|3      |
|102|4      |
|103|5      |
|104|6      |
|104|7      |
|101|9      |
|105|8      |
+---+-------+

推荐阅读