python - 跨组的 Pyspark 示例数据框
问题描述
我有一个具有以下结构的数据框
+------+------+--------+------+--------------------+
|group1|group2|position|value1| value2|
+------+------+--------+------+--------------------+
| G1_A| G2_P| 0001| 6| 0.3217543124839014|
| G1_A| G2_P| 0002| 6| 0.4554057162820776|
| G1_A| G2_P| 0003| 8| 0.3801357655062654|
| G1_A| G2_P| 0004| 1| 0.8910865867971118|
| G1_A| G2_P| 0005| 5| 0.04929044804190086|
| G1_A| G2_Q| 0001| 7| 0.10188969920834146|
| G1_A| G2_Q| 0002| 9| 0.4900202258755447|
| G1_A| G2_Q| 0003| 9| 0.0570759385425319|
| G1_A| G2_Q| 0004| 0| 0.8638132568329479|
| G1_A| G2_Q| 0005| 8| 0.5631513545869068|
| G1_A| G2_R| 0001| 2| 0.18320914601531957|
| G1_A| G2_R| 0002| 0| 0.722470705002637|
| G1_A| G2_R| 0003| 6| 0.27988540796939354|
| G1_A| G2_R| 0004| 7| 0.13827103885498537|
| G1_A| G2_R| 0005| 6| 0.8410548211059407|
| G1_A| G2_S| 0001| 1| 0.6542679700270546|
| G1_A| G2_S| 0002| 9| 0.8858848000834335|
| G1_A| G2_S| 0003| 7| 0.5113964766224457|
| G1_A| G2_S| 0004| 9| 0.7758283878692317|
| G1_A| G2_S| 0005| 4|0.011421066938733127|
| G1_B| G2_P| 0001| 1| 0.6098855780360801|
| G1_B| G2_P| 0002| 8|0.009644056732163175|
| G1_B| G2_P| 0003| 1| 0.9216012386238513|
| G1_B| G2_P| 0004| 7| 0.8658947151731069|
| G1_B| G2_P| 0005| 7| 0.8018548921412443|
| G1_B| G2_Q| 0001| 7| 0.670090542740813|
| G1_B| G2_Q| 0002| 6| 0.5051134978717621|
| G1_B| G2_Q| 0003| 1| 0.16873516416387302|
| G1_B| G2_Q| 0004| 8| 0.7750478028867812|
| G1_B| G2_Q| 0005| 6| 0.9857364635291703|
| G1_B| G2_R| 0001| 8| 0.8956034505498771|
| G1_B| G2_R| 0002| 5| 0.9537748989951761|
| G1_B| G2_R| 0003| 0| 0.14952641909752684|
| G1_B| G2_R| 0004| 9| 0.3728857754552449|
| G1_B| G2_R| 0005| 8| 0.55145790830298|
| G1_B| G2_S| 0001| 9| 0.5261231425475038|
| G1_B| G2_S| 0002| 6| 0.6789322931505193|
| G1_B| G2_S| 0003| 2| 0.9682503963857059|
| G1_B| G2_S| 0004| 4| 0.21506064374959122|
| G1_B| G2_S| 0005| 4| 0.5521363246845827|
+------+------+--------+------+--------------------+
该group1
列有两个不同的值,G1_A
并且G1_B
具有group2
独特的价值['G2_P', 'G2_Q', 'G2_R', 'G2_S']
和
具有position
独特的价值['0001', '0002', '0003', '0004', '0005']
对于数据框中存在的每个组合,例如G1_A
x G2_P
,我只需要对前三个值进行采样position
-['0001', '0002', '0003']
这是我正在寻找的输出
+------+------+--------+------+--------------------+
|group1|group2|position|value1| value2|
+------+------+--------+------+--------------------+
| G1_A| G2_P| 0001| 6| 0.3217543124839014|
| G1_A| G2_P| 0002| 6| 0.4554057162820776|
| G1_A| G2_P| 0003| 8| 0.3801357655062654|
| G1_A| G2_Q| 0001| 7| 0.10188969920834146|
| G1_A| G2_Q| 0002| 9| 0.4900202258755447|
| G1_A| G2_Q| 0003| 9| 0.0570759385425319|
| G1_A| G2_R| 0001| 2| 0.18320914601531957|
| G1_A| G2_R| 0002| 0| 0.722470705002637|
| G1_A| G2_R| 0003| 6| 0.27988540796939354|
| G1_A| G2_S| 0001| 1| 0.6542679700270546|
| G1_A| G2_S| 0002| 9| 0.8858848000834335|
| G1_A| G2_S| 0003| 7| 0.5113964766224457|
| G1_B| G2_P| 0001| 1| 0.6098855780360801|
| G1_B| G2_P| 0002| 8|0.009644056732163175|
| G1_B| G2_P| 0003| 1| 0.9216012386238513|
| G1_B| G2_Q| 0001| 7| 0.670090542740813|
| G1_B| G2_Q| 0002| 6| 0.5051134978717621|
| G1_B| G2_Q| 0003| 1| 0.16873516416387302|
| G1_B| G2_R| 0001| 8| 0.8956034505498771|
| G1_B| G2_R| 0002| 5| 0.9537748989951761|
| G1_B| G2_R| 0003| 0| 0.14952641909752684|
| G1_B| G2_S| 0001| 9| 0.5261231425475038|
| G1_B| G2_S| 0002| 6| 0.6789322931505193|
| G1_B| G2_S| 0003| 2| 0.9682503963857059|
+------+------+--------+------+--------------------+
我当前的方法将数据框拆分为较小的数据框,我需要将这些数据框合并以获得完整的数据框
listids = [list(x.asDict().values()) for x in spark_df.select("group1", "group2").distinct().collect()]
dfArray = [spark_df.filter((col("group1") == x[0]) &
(col("group2") == x[1]) &
(col("position").isin(['0001', '0002', '0003'])))
for x in listids]
有没有更好的方法来执行这些操作?
这种方法的问题是它需要我对position
条件进行硬编码。我在过滤步骤中使用了这个条件(col("position").isin(['0001','0002', '0003'])))
。
我不会总是有幸知道我需要选择的精确位置。另一种方法是使用 a.collect
获取位置编号,对其进行排序然后过滤它们,但这是一项昂贵的操作。我已经在使用 a.collect
来获取不同的组组合
我正在寻找一种对.collect
操作依赖最少的方法
附录
这是生成我使用的火花数据帧的代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import numpy as np
import pandas as pd
cols = ["group1", "group2", "position"]
group1 = ['G1_A', 'G1_B']
group2 = ['G2_P', 'G2_Q','G2_R', 'G2_S']
positions = ['000' + str(i + 1) for i in range(5)]
group_data = [[g1, g2, position] for g1 in group1 for g2 in group2 for position in positions]
df = pd.DataFrame(group_data, columns= cols)
np.random.seed(42)
df['value1'] = np.random.randint(0, 10, df.shape[0])
np.random.seed(42)
df['value2'] = np.random.rand(df.shape[0])
spark = SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(df.values.tolist(), df.columns.tolist())
解决方案
您可以使用窗口函数。
partitionBy("group1", "group2")
为您提供包含所有列组合的组。orderBy("position")
对每个组内的行进行排序。row_number()
根据顺序为您提供每个组内的行号。根据逻辑,使用其他功能可能会更好。(即rank()
,dense_rank()
)
之后,您有一个“row_num”列,每个组内都有多个位置,您可以过滤数据。
from pyspark.sql.functions import col, lit, row_number
from pyspark.sql import Window
df = spark.createDataFrame([
("G1_A", "G2_P", "0001", 6, 0.3217543124839014),
("G1_A", "G2_P", "0002", 6, 0.4554057162820776),
("G1_A", "G2_P", "0003", 8, 0.3801357655062654),
("G1_A", "G2_P", "0004", 1, 0.8910865867971118),
("G1_A", "G2_P", "0005", 5, 0.04929044804190086),
("G1_A", "G2_Q", "0001", 7, 0.10188969920834146),
("G1_A", "G2_Q", "0002", 9, 0.4900202258755447),
("G1_A", "G2_Q", "0003", 9, 0.0570759385425319),
("G1_A", "G2_Q", "0004", 0, 0.8638132568329479),
("G1_A", "G2_Q", "0005", 8, 0.5631513545869068),
("G1_A", "G2_R", "0001", 2, 0.18320914601531957),
("G1_A", "G2_R", "0002", 0, 0.722470705002637),
("G1_A", "G2_R", "0003", 6, 0.27988540796939354),
("G1_A", "G2_R", "0004", 7, 0.13827103885498537),
("G1_A", "G2_R", "0005", 6, 0.8410548211059407),
("G1_A", "G2_S", "0001", 1, 0.6542679700270546),
("G1_A", "G2_S", "0002", 9, 0.8858848000834335),
("G1_A", "G2_S", "0003", 7, 0.5113964766224457),
("G1_A", "G2_S", "0004", 9, 0.7758283878692317),
("G1_A", "G2_S", "0005", 4, 0.011421066938733127),
("G1_B", "G2_P", "0001", 1, 0.6098855780360801),
("G1_B", "G2_P", "0002", 8, 0.009644056732163175),
("G1_B", "G2_P", "0003", 1, 0.9216012386238513),
("G1_B", "G2_P", "0004", 7, 0.8658947151731069),
("G1_B", "G2_P", "0005", 7, 0.8018548921412443),
("G1_B", "G2_Q", "0001", 7, 0.670090542740813),
("G1_B", "G2_Q", "0002", 6, 0.5051134978717621),
("G1_B", "G2_Q", "0003", 1, 0.16873516416387302),
("G1_B", "G2_Q", "0004", 8, 0.7750478028867812),
("G1_B", "G2_Q", "0005", 6, 0.9857364635291703),
("G1_B", "G2_R", "0001", 8, 0.8956034505498771),
("G1_B", "G2_R", "0002", 5, 0.9537748989951761),
("G1_B", "G2_R", "0003", 0, 0.14952641909752684),
("G1_B", "G2_R", "0004", 9, 0.3728857754552449),
("G1_B", "G2_R", "0005", 8, 0.55145790830298),
("G1_B", "G2_S", "0001", 9, 0.5261231425475038),
("G1_B", "G2_S", "0002", 6, 0.6789322931505193),
("G1_B", "G2_S", "0003", 2, 0.9682503963857059),
("G1_B", "G2_S", "0004", 4, 0.21506064374959122),
("G1_B", "G2_S", "0005", 4, 0.5521363246845827)],
["group1", "group2", "position", "value1", "value2"])
window_spec = Window \
.partitionBy("group1", "group2") \
.orderBy("position")
res = df \
.withColumn("row_num", row_number().over(window_spec)) \
.where(col("row_num") <= 3) \
.drop("row_num")
res.show()
输出:
+------+------+--------+------+--------------------+
|group1|group2|position|value1| value2|
+------+------+--------+------+--------------------+
| G1_A| G2_P| 0001| 6| 0.3217543124839014|
| G1_A| G2_P| 0002| 6| 0.4554057162820776|
| G1_A| G2_P| 0003| 8| 0.3801357655062654|
| G1_A| G2_Q| 0001| 7| 0.10188969920834146|
| G1_A| G2_Q| 0002| 9| 0.4900202258755447|
| G1_A| G2_Q| 0003| 9| 0.0570759385425319|
| G1_A| G2_R| 0001| 2| 0.18320914601531957|
| G1_A| G2_R| 0002| 0| 0.722470705002637|
| G1_A| G2_R| 0003| 6| 0.27988540796939354|
| G1_A| G2_S| 0001| 1| 0.6542679700270546|
| G1_A| G2_S| 0002| 9| 0.8858848000834335|
| G1_A| G2_S| 0003| 7| 0.5113964766224457|
| G1_B| G2_P| 0001| 1| 0.6098855780360801|
| G1_B| G2_P| 0002| 8|0.009644056732163175|
| G1_B| G2_P| 0003| 1| 0.9216012386238513|
| G1_B| G2_Q| 0001| 7| 0.670090542740813|
| G1_B| G2_Q| 0002| 6| 0.5051134978717621|
| G1_B| G2_Q| 0003| 1| 0.16873516416387302|
| G1_B| G2_R| 0001| 8| 0.8956034505498771|
| G1_B| G2_R| 0002| 5| 0.9537748989951761|
+------+------+--------+------+--------------------+
推荐阅读
- python - 即使在 python3 中使用 encoding=utf-8 也无法编码字符 '\u0144'
- python - Plotly Dash:编辑背景图像
- kotlin - 如何用编程语言解密记事本++ nppcrypt加密消息
- excel - excel中是否有任何功能可以锁定标题以进行打印?
- javascript - 如何使用脚本编辑器在 Google 表格中选择和删除从 ListView(MIT App Inventor)中选择的行?
- python - 使用 Usercreationform 时,额外的用户注册信息无法存储在数据库中,例如 phoneno playergameid 排除用户名、django 中的密码
- flutter - 如何使所有容器在列内具有均匀的高度?
- powerbi - DAX - 达到目标的月份百分比
- swift - Swiftui - 将 sk3dnode 添加到 spriteview 中未在 watchOS 上显示
- shopify - 当用户在 shopify 的应用中心单击应用程序名称时,我可以在哪里设置用户将重定向的 URL?