python - 使用 UDF 生成 UUID 列,然后拆分为具有公共 UUID 列的两个数据框
问题描述
我有一个数据框,其中包含有关人员的信息,其中包含、 和列first_name
,其中是逗号分隔的电子邮件地址列表。数据看起来像这样:last_name
email_addresses
email_addresses
名 | 姓 | 电子邮件地址 |
---|---|---|
简 | 能源部 | jane.doe@example.com,doe.jane@example.com |
约翰 | 史密斯 | john.smith@example.com,smith.john@example.com |
我试图通过首先person_id
使用 UDF 添加一个填充有 UUID 的列,然后通过在列上执行split
和创建一个新的数据框来将其拆分为两个数据框。explode
email_addresses
我的代码看起来像这样:
from uuid import uuid4
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
uuid_udf = F.udf(lambda: str(uuid4()), StringType()).asNondeterministic()
df = ingest_data_from_csv() # Imagine this function populates a dataframe from a CSV
df_with_id = df.withColumn("person_id", uuid_udf())
person_df = df_with_id.drop("email_addresses")
email_df = (
df_with_id.drop("first_name", "last_name")
.withColumn("email_address", F.explode(F.split(F.col("email_addresses"), ",")))
.drop("email_addresses")
我认为这会给我两个具有外键类型关系的数据框,看起来像这样:
person_id | 名 | 姓 |
---|---|---|
00000000-0000-0000-0000-000000000001 | 简 | 能源部 |
00000000-0000-0000-0000-000000000002 | 约翰 | 史密斯 |
person_id | 电子邮件地址 |
---|---|
00000000-0000-0000-0000-000000000001 | jane.doe@example.com |
00000000-0000-0000-0000-000000000001 | doe.jane@example.com |
00000000-0000-0000-0000-000000000002 | john.smith@example.com |
00000000-0000-0000-0000-000000000002 | smith.john@example.com |
但我最终得到的是所有person_id
列中的不同值,看起来像这样:
person_id | 名 | 姓 |
---|---|---|
00000000-0000-0000-0000-000000000001 | 简 | 能源部 |
00000000-0000-0000-0000-000000000002 | 约翰 | 史密斯 |
person_id | 电子邮件地址 |
---|---|
00000000-0000-0000-0000-000000000003 | jane.doe@example.com |
00000000-0000-0000-0000-000000000004 | doe.jane@example.com |
00000000-0000-0000-0000-000000000005 | john.smith@example.com |
00000000-0000-0000-0000-000000000006 | smith.john@example.com |
有没有办法用pyspark实现这种“外键”类型的关系?
解决方案
您可以通过创建和sha2
的哈希值来使用 Spark 的功能。请注意,我还添加了创建哈希值。我之所以使用这个,是因为如果和相似,那么我们不应该有相似的哈希值。first_name
last_name
monotonically_increasing_id
first_name
last_name
代码:
from uuid import uuid4
from pyspark.sql import functions as F
df = spark.read.option("header","true").csv("D:/DataAnalysis/person.csv")
df.show(truncate=False)
+----------+---------+---------------------------------------------+
|first_name|last_name|email_addresses |
+----------+---------+---------------------------------------------+
|Jane |Doe |jane.doe@example.com,doe.jane@example.com |
|John |Smith |john.smith@example.com,smith.john@example.com|
+----------+---------+---------------------------------------------+
from pyspark.sql.functions import col, sha2, concat
df_with_id = df.withColumn("uid", sha2(concat(F.monotonically_increasing_id(),col("first_name"), col("last_name")), 256))
df_with_id.show(truncate=False)
+----------+---------+---------------------------------------------+----------------------------------------------------------------+
|first_name|last_name|email_addresses |uid |
+----------+---------+---------------------------------------------+----------------------------------------------------------------+
|Jane |Doe |jane.doe@example.com,doe.jane@example.com |ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|
|John |Smith |john.smith@example.com,smith.john@example.com|001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|
+----------+---------+---------------------------------------------+----------------------------------------------------------------+
person_df = df_with_id.drop("email_addresses")
person_df.show(truncate=False)
+----------+---------+----------------------------------------------------------------+
|first_name|last_name|uid |
+----------+---------+----------------------------------------------------------------+
|Jane |Doe |ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|
|John |Smith |001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|
+----------+---------+----------------------------------------------------------------+
email_df = (
df_with_id.drop("first_name", "last_name")
.withColumn("email_address", F.explode(F.split(F.col("email_addresses"), ",")))
.drop("email_addresses")
)
email_df.show(truncate=False)
+----------------------------------------------------------------+----------------------+
|uid |email_address |
+----------------------------------------------------------------+----------------------+
|ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|jane.doe@example.com |
|ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|doe.jane@example.com |
|001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|john.smith@example.com|
|001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|smith.john@example.com|
+----------------------------------------------------------------+----------------------+
推荐阅读
- python - 烧瓶 sqlite3 安全问题
- python - 根据条件更新 Numpy 数组
- dynamic - 如何从目录中获取文件列表?
- materialize - 如何在物化中使用 vanilla java-script 初始化轮播全角和指标?
- django - Django Admin中删除操作的奇怪错误
- angular7 - 会话过期时重定向到登录页面
- javascript - 使用 javascript 删除 django formset
- typescript - 编译打字稿时找不到名称“符号”
- android - 搜索栏进度动画?
- ios - 调用 invalidateLayout() 或 reloadData() 以在设备旋转后重新加载从情节提要创建的 UICollectionView