python - Pyspark - 保留收集列表的顺序并在多列上收集集合
问题描述
我有以下 pyspark 数据框。
Column_1 Column_2 Column_3 Column_4
1 A U1 12345
1 A A1 549BZ4G
预期输出:
按第 1 列和第 2 列分组。收集第 3 列和第 4 列集合,同时保留输入数据框中的顺序。它应该与输入的顺序相同。第 3 列和第 4 列之间的排序没有依赖性。两者都必须保留输入数据帧的排序
Column_1 Column_2 Column_3 Column_4
1 A U1,A1 12345,549BZ4G
到目前为止我尝试了什么:
我首先尝试使用窗口方法。我按第 1 列和第 2 列分区,按第 1 列和第 2 列排序。然后我按第 1 列和第 2 列分组,并在第 3 列和第 4 列上进行了收集集。
我没有得到预期的输出。我的结果如下。
Column_1 Column_2 Column_3 Column_4
1 A U1,A1 549BZ4G,12345
我还尝试使用单调递增的 id 创建索引,然后按索引排序,然后进行 group by 和 collect set 以获取输出。但仍然没有运气。
是因为字母数字和数值吗?如何保留第 3 列和第 4 列的顺序,因为它在输入中没有改变顺序。
解决方案
使用monotically_increasing_id
spark 中的函数来维护订单。您可以在此处找到有关它的更多信息
#InputDF
# +----+----+----+-------+
# |col1|col2|col3| col4|
# +----+----+----+-------+
# | 1| A| U1| 12345|
# | 1| A| A1|549BZ4G|
# +----+----+----+-------+
df1 = df.withColumn("id", F.monotonically_increasing_id()).groupby("Col1", "col2").agg(F.collect_list("col4").alias("Col4"),F.collect_list("col3").alias("Col3"))
df1.select("col1", "col2",F.array_join("col3", ",").alias("col3"),F.array_join("col4", ",").alias("col4")).show()
# OutputDF
# +----+----+-----+-------------+
# |col1|col2| col3| col4|
# +----+----+-----+-------------+
# | 1| A|U1,A1|12345,549BZ4G|
# +----+----+-----+-------------+
array_distinct
在顶部使用collect_list
具有不同的值并保持秩序。
#InputDF
# +----+----+----+-------+
# |col1|col2|col3| col4|
# +----+----+----+-------+
# | 1| A| U1| 12345|
# | 1| A| A1|549BZ4G|
# | 1| A| U1|123456 |
# +----+----+----+-------+
df1 = df.withColumn("id", F.monotonically_increasing_id()).groupby("Col1", "col2").agg(
F.array_distinct(F.collect_list("col4")).alias("Col4"),F.array_distinct(F.collect_list("col3")).alias("Col3"))
df1.select("col1", "col2", F.array_join("col3", ",").alias("col3"), F.array_join("col4", ",").alias("col4")).show(truncate=False)
# +----+----+-----+---------------------+
# |col1|col2|col3 |col4 |
# +----+----+-----+---------------------+
# |1 |A |U1,A1|12345,549BZ4G,123456 |
# +----+----+-----+---------------------+
推荐阅读
- python - Apache Airflow - 连接到 AWS S3 错误
- java - 我如何获得在 main 中使用的方法的值
- firebase - 项目之间的 Firebase 迁移 - 导入命令时目标 Fire Store 数据库不会更改(未显示错误)
- hyperledger-fabric - 是否可以在 levelDB 超级账本结构、Javascript 上添加表
- ros - 我在设置 Ros 主从时遇到问题
- c# - MySQL C# 连接错误,无法连接
- python - 我试图掩盖熊猫的日期,之前和之后
- kendo-grid - Kendo Grid 导出选中的行
- javascript - jQuery fadeIn() 反复调用不起作用
- javascript - Discord.JS TypeError:尝试添加角色时无法读取未定义的属性“角色”