首页 > 解决方案 > 跨组的 Pyspark 示例数据框

问题描述

我有一个具有以下结构的数据框

+------+------+--------+------+--------------------+
|group1|group2|position|value1|              value2|
+------+------+--------+------+--------------------+
|  G1_A|  G2_P|    0001|     6|  0.3217543124839014|
|  G1_A|  G2_P|    0002|     6|  0.4554057162820776|
|  G1_A|  G2_P|    0003|     8|  0.3801357655062654|
|  G1_A|  G2_P|    0004|     1|  0.8910865867971118|
|  G1_A|  G2_P|    0005|     5| 0.04929044804190086|
|  G1_A|  G2_Q|    0001|     7| 0.10188969920834146|
|  G1_A|  G2_Q|    0002|     9|  0.4900202258755447|
|  G1_A|  G2_Q|    0003|     9|  0.0570759385425319|
|  G1_A|  G2_Q|    0004|     0|  0.8638132568329479|
|  G1_A|  G2_Q|    0005|     8|  0.5631513545869068|
|  G1_A|  G2_R|    0001|     2| 0.18320914601531957|
|  G1_A|  G2_R|    0002|     0|   0.722470705002637|
|  G1_A|  G2_R|    0003|     6| 0.27988540796939354|
|  G1_A|  G2_R|    0004|     7| 0.13827103885498537|
|  G1_A|  G2_R|    0005|     6|  0.8410548211059407|
|  G1_A|  G2_S|    0001|     1|  0.6542679700270546|
|  G1_A|  G2_S|    0002|     9|  0.8858848000834335|
|  G1_A|  G2_S|    0003|     7|  0.5113964766224457|
|  G1_A|  G2_S|    0004|     9|  0.7758283878692317|
|  G1_A|  G2_S|    0005|     4|0.011421066938733127|
|  G1_B|  G2_P|    0001|     1|  0.6098855780360801|
|  G1_B|  G2_P|    0002|     8|0.009644056732163175|
|  G1_B|  G2_P|    0003|     1|  0.9216012386238513|
|  G1_B|  G2_P|    0004|     7|  0.8658947151731069|
|  G1_B|  G2_P|    0005|     7|  0.8018548921412443|
|  G1_B|  G2_Q|    0001|     7|   0.670090542740813|
|  G1_B|  G2_Q|    0002|     6|  0.5051134978717621|
|  G1_B|  G2_Q|    0003|     1| 0.16873516416387302|
|  G1_B|  G2_Q|    0004|     8|  0.7750478028867812|
|  G1_B|  G2_Q|    0005|     6|  0.9857364635291703|
|  G1_B|  G2_R|    0001|     8|  0.8956034505498771|
|  G1_B|  G2_R|    0002|     5|  0.9537748989951761|
|  G1_B|  G2_R|    0003|     0| 0.14952641909752684|
|  G1_B|  G2_R|    0004|     9|  0.3728857754552449|
|  G1_B|  G2_R|    0005|     8|    0.55145790830298|
|  G1_B|  G2_S|    0001|     9|  0.5261231425475038|
|  G1_B|  G2_S|    0002|     6|  0.6789322931505193|
|  G1_B|  G2_S|    0003|     2|  0.9682503963857059|
|  G1_B|  G2_S|    0004|     4| 0.21506064374959122|
|  G1_B|  G2_S|    0005|     4|  0.5521363246845827|
+------+------+--------+------+--------------------+

group1列有两个不同的值,G1_A并且G1_B

具有group2独特的价值['G2_P', 'G2_Q', 'G2_R', 'G2_S']

具有position独特的价值['0001', '0002', '0003', '0004', '0005']

对于数据框中存在的每个组合,例如G1_Ax G2_P,我只需要对前三个值进行采样position-['0001', '0002', '0003']

这是我正在寻找的输出

+------+------+--------+------+--------------------+
|group1|group2|position|value1|              value2|
+------+------+--------+------+--------------------+
|  G1_A|  G2_P|    0001|     6|  0.3217543124839014|
|  G1_A|  G2_P|    0002|     6|  0.4554057162820776|
|  G1_A|  G2_P|    0003|     8|  0.3801357655062654|
|  G1_A|  G2_Q|    0001|     7| 0.10188969920834146|
|  G1_A|  G2_Q|    0002|     9|  0.4900202258755447|
|  G1_A|  G2_Q|    0003|     9|  0.0570759385425319|
|  G1_A|  G2_R|    0001|     2| 0.18320914601531957|
|  G1_A|  G2_R|    0002|     0|   0.722470705002637|
|  G1_A|  G2_R|    0003|     6| 0.27988540796939354|
|  G1_A|  G2_S|    0001|     1|  0.6542679700270546|
|  G1_A|  G2_S|    0002|     9|  0.8858848000834335|
|  G1_A|  G2_S|    0003|     7|  0.5113964766224457|
|  G1_B|  G2_P|    0001|     1|  0.6098855780360801|
|  G1_B|  G2_P|    0002|     8|0.009644056732163175|
|  G1_B|  G2_P|    0003|     1|  0.9216012386238513|
|  G1_B|  G2_Q|    0001|     7|   0.670090542740813|
|  G1_B|  G2_Q|    0002|     6|  0.5051134978717621|
|  G1_B|  G2_Q|    0003|     1| 0.16873516416387302|
|  G1_B|  G2_R|    0001|     8|  0.8956034505498771|
|  G1_B|  G2_R|    0002|     5|  0.9537748989951761|
|  G1_B|  G2_R|    0003|     0| 0.14952641909752684|
|  G1_B|  G2_S|    0001|     9|  0.5261231425475038|
|  G1_B|  G2_S|    0002|     6|  0.6789322931505193|
|  G1_B|  G2_S|    0003|     2|  0.9682503963857059|
+------+------+--------+------+--------------------+

我当前的方法将数据框拆分为较小的数据框,我需要将这些数据框合并以获得完整的数据框

listids = [list(x.asDict().values()) for x in spark_df.select("group1", "group2").distinct().collect()]
dfArray = [spark_df.filter((col("group1") == x[0]) &
                           (col("group2") == x[1]) &
                           (col("position").isin(['0001', '0002', '0003'])))
           for x in listids]

有没有更好的方法来执行这些操作?

这种方法的问题是它需要我对position条件进行硬编码。我在过滤步骤中使用了这个条件(col("position").isin(['0001','0002', '0003'])))

我不会总是有幸知道我需要选择的精确位置。另一种方法是使用 a.collect获取位置编号,对其进行排序然后过滤它们,但这是一项昂贵的操作。我已经在使用 a.collect来获取不同的组组合

我正在寻找一种对.collect操作依赖最少的方法

附录

这是生成我使用的火花数据帧的代码

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import numpy as np
import pandas as pd


cols = ["group1", "group2", "position"]
group1 = ['G1_A', 'G1_B']
group2 = ['G2_P', 'G2_Q','G2_R', 'G2_S']
positions = ['000' + str(i + 1) for i in range(5)]
group_data = [[g1, g2, position] for g1 in group1 for g2 in group2 for position in positions]
df = pd.DataFrame(group_data, columns= cols)
np.random.seed(42)
df['value1'] = np.random.randint(0, 10, df.shape[0])
np.random.seed(42)
df['value2'] = np.random.rand(df.shape[0])
spark = SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(df.values.tolist(), df.columns.tolist())

标签: pythondataframeapache-sparkpyspark

解决方案


您可以使用窗口函数。

  1. partitionBy("group1", "group2")为您提供包含所有列组合的组。
  2. orderBy("position")对每个组内的行进行排序。
  3. row_number()根据顺序为您提供每个组内的行号。根据逻辑,使用其他功能可能会更好。(即rank()dense_rank()

之后,您有一个“row_num”列,每个组内都有多个位置,您可以过滤数据。

from pyspark.sql.functions import col, lit, row_number
from pyspark.sql import Window

df = spark.createDataFrame([
    ("G1_A", "G2_P", "0001", 6, 0.3217543124839014),
    ("G1_A", "G2_P", "0002", 6, 0.4554057162820776),
    ("G1_A", "G2_P", "0003", 8, 0.3801357655062654),
    ("G1_A", "G2_P", "0004", 1, 0.8910865867971118),
    ("G1_A", "G2_P", "0005", 5, 0.04929044804190086),
    ("G1_A", "G2_Q", "0001", 7, 0.10188969920834146),
    ("G1_A", "G2_Q", "0002", 9, 0.4900202258755447),
    ("G1_A", "G2_Q", "0003", 9, 0.0570759385425319),
    ("G1_A", "G2_Q", "0004", 0, 0.8638132568329479),
    ("G1_A", "G2_Q", "0005", 8, 0.5631513545869068),
    ("G1_A", "G2_R", "0001", 2, 0.18320914601531957),
    ("G1_A", "G2_R", "0002", 0, 0.722470705002637),
    ("G1_A", "G2_R", "0003", 6, 0.27988540796939354),
    ("G1_A", "G2_R", "0004", 7, 0.13827103885498537),
    ("G1_A", "G2_R", "0005", 6, 0.8410548211059407),
    ("G1_A", "G2_S", "0001", 1, 0.6542679700270546),
    ("G1_A", "G2_S", "0002", 9, 0.8858848000834335),
    ("G1_A", "G2_S", "0003", 7, 0.5113964766224457),
    ("G1_A", "G2_S", "0004", 9, 0.7758283878692317),
    ("G1_A", "G2_S", "0005", 4, 0.011421066938733127),
    ("G1_B", "G2_P", "0001", 1, 0.6098855780360801),
    ("G1_B", "G2_P", "0002", 8, 0.009644056732163175),
    ("G1_B", "G2_P", "0003", 1, 0.9216012386238513),
    ("G1_B", "G2_P", "0004", 7, 0.8658947151731069),
    ("G1_B", "G2_P", "0005", 7, 0.8018548921412443),
    ("G1_B", "G2_Q", "0001", 7, 0.670090542740813),
    ("G1_B", "G2_Q", "0002", 6, 0.5051134978717621),
    ("G1_B", "G2_Q", "0003", 1, 0.16873516416387302),
    ("G1_B", "G2_Q", "0004", 8, 0.7750478028867812),
    ("G1_B", "G2_Q", "0005", 6, 0.9857364635291703),
    ("G1_B", "G2_R", "0001", 8, 0.8956034505498771),
    ("G1_B", "G2_R", "0002", 5, 0.9537748989951761),
    ("G1_B", "G2_R", "0003", 0, 0.14952641909752684),
    ("G1_B", "G2_R", "0004", 9, 0.3728857754552449),
    ("G1_B", "G2_R", "0005", 8, 0.55145790830298),
    ("G1_B", "G2_S", "0001", 9, 0.5261231425475038),
    ("G1_B", "G2_S", "0002", 6, 0.6789322931505193),
    ("G1_B", "G2_S", "0003", 2, 0.9682503963857059),
    ("G1_B", "G2_S", "0004", 4, 0.21506064374959122),
    ("G1_B", "G2_S", "0005", 4, 0.5521363246845827)],
    ["group1", "group2", "position", "value1", "value2"])

window_spec = Window \
    .partitionBy("group1", "group2") \
    .orderBy("position")

res = df \
    .withColumn("row_num", row_number().over(window_spec)) \
    .where(col("row_num") <= 3) \
    .drop("row_num")

res.show()

输出:

+------+------+--------+------+--------------------+
|group1|group2|position|value1|              value2|
+------+------+--------+------+--------------------+
|  G1_A|  G2_P|    0001|     6|  0.3217543124839014|
|  G1_A|  G2_P|    0002|     6|  0.4554057162820776|
|  G1_A|  G2_P|    0003|     8|  0.3801357655062654|
|  G1_A|  G2_Q|    0001|     7| 0.10188969920834146|
|  G1_A|  G2_Q|    0002|     9|  0.4900202258755447|
|  G1_A|  G2_Q|    0003|     9|  0.0570759385425319|
|  G1_A|  G2_R|    0001|     2| 0.18320914601531957|
|  G1_A|  G2_R|    0002|     0|   0.722470705002637|
|  G1_A|  G2_R|    0003|     6| 0.27988540796939354|
|  G1_A|  G2_S|    0001|     1|  0.6542679700270546|
|  G1_A|  G2_S|    0002|     9|  0.8858848000834335|
|  G1_A|  G2_S|    0003|     7|  0.5113964766224457|
|  G1_B|  G2_P|    0001|     1|  0.6098855780360801|
|  G1_B|  G2_P|    0002|     8|0.009644056732163175|
|  G1_B|  G2_P|    0003|     1|  0.9216012386238513|
|  G1_B|  G2_Q|    0001|     7|   0.670090542740813|
|  G1_B|  G2_Q|    0002|     6|  0.5051134978717621|
|  G1_B|  G2_Q|    0003|     1| 0.16873516416387302|
|  G1_B|  G2_R|    0001|     8|  0.8956034505498771|
|  G1_B|  G2_R|    0002|     5|  0.9537748989951761|
+------+------+--------+------+--------------------+

推荐阅读