首页 > 解决方案 > 使用 UDF 生成 UUID 列,然后拆分为具有公共 UUID 列的两个数据框

问题描述

我有一个数据框,其中包含有关人员的信息,其中包含、 和列first_name,其中是逗号分隔的电子邮件地址列表。数据看起来像这样:last_nameemail_addressesemail_addresses

电子邮件地址
能源部 jane.doe@example.com,doe.jane@example.com
约翰 史密斯 john.smith@example.com,smith.john@example.com

我试图通过首先person_id使用 UDF 添加一个填充有 UUID 的列,然后通过在列上执行split和创建一个新的数据框来将其拆分为两个数据框。explodeemail_addresses

我的代码看起来像这样:

from uuid import uuid4
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

uuid_udf = F.udf(lambda: str(uuid4()), StringType()).asNondeterministic()

df = ingest_data_from_csv() # Imagine this function populates a dataframe from a CSV

df_with_id = df.withColumn("person_id", uuid_udf())

person_df = df_with_id.drop("email_addresses")

email_df = (
    df_with_id.drop("first_name", "last_name")
    .withColumn("email_address", F.explode(F.split(F.col("email_addresses"), ",")))
    .drop("email_addresses")

我认为这会给我两个具有外键类型关系的数据框,看起来像这样:

person_id
00000000-0000-0000-0000-000000000001 能源部
00000000-0000-0000-0000-000000000002 约翰 史密斯
person_id 电子邮件地址
00000000-0000-0000-0000-000000000001 jane.doe@example.com
00000000-0000-0000-0000-000000000001 doe.jane@example.com
00000000-0000-0000-0000-000000000002 john.smith@example.com
00000000-0000-0000-0000-000000000002 smith.john@example.com

但我最终得到的是所有person_id列中的不同值,看起来像这样:

person_id
00000000-0000-0000-0000-000000000001 能源部
00000000-0000-0000-0000-000000000002 约翰 史密斯
person_id 电子邮件地址
00000000-0000-0000-0000-000000000003 jane.doe@example.com
00000000-0000-0000-0000-000000000004 doe.jane@example.com
00000000-0000-0000-0000-000000000005 john.smith@example.com
00000000-0000-0000-0000-000000000006 smith.john@example.com

有没有办法用pyspark实现这种“外键”类型的关系?

标签: pythondataframeapache-sparkpyspark

解决方案


您可以通过创建和sha2的哈希值来使用 Spark 的功能。请注意,我还添加了创建哈希值。我之所以使用这个,是因为如果和相似,那么我们不应该有相似的哈希值。first_namelast_namemonotonically_increasing_idfirst_namelast_name

代码:

from uuid import uuid4
from pyspark.sql import functions as F
df = spark.read.option("header","true").csv("D:/DataAnalysis/person.csv")
    
df.show(truncate=False)
+----------+---------+---------------------------------------------+
|first_name|last_name|email_addresses                              |
+----------+---------+---------------------------------------------+
|Jane      |Doe      |jane.doe@example.com,doe.jane@example.com    |
|John      |Smith    |john.smith@example.com,smith.john@example.com|
+----------+---------+---------------------------------------------+
from pyspark.sql.functions import col, sha2, concat 

df_with_id = df.withColumn("uid", sha2(concat(F.monotonically_increasing_id(),col("first_name"), col("last_name")), 256))

df_with_id.show(truncate=False)
+----------+---------+---------------------------------------------+----------------------------------------------------------------+
|first_name|last_name|email_addresses                              |uid                                                             |
+----------+---------+---------------------------------------------+----------------------------------------------------------------+
|Jane      |Doe      |jane.doe@example.com,doe.jane@example.com    |ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|
|John      |Smith    |john.smith@example.com,smith.john@example.com|001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|
+----------+---------+---------------------------------------------+----------------------------------------------------------------+

person_df = df_with_id.drop("email_addresses")

person_df.show(truncate=False)
+----------+---------+----------------------------------------------------------------+
|first_name|last_name|uid                                                             |
+----------+---------+----------------------------------------------------------------+
|Jane      |Doe      |ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|
|John      |Smith    |001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|
+----------+---------+----------------------------------------------------------------+

email_df = (
    df_with_id.drop("first_name", "last_name")
    .withColumn("email_address", F.explode(F.split(F.col("email_addresses"), ",")))
    .drop("email_addresses")
    )   

email_df.show(truncate=False)
+----------------------------------------------------------------+----------------------+
|uid                                                             |email_address         |
+----------------------------------------------------------------+----------------------+
|ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|jane.doe@example.com  |
|ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|doe.jane@example.com  |
|001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|john.smith@example.com|
|001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|smith.john@example.com|
+----------------------------------------------------------------+----------------------+   

推荐阅读