首页 > 解决方案 > 动态填充pyspark数据框中列中的行

问题描述

我想要这样的东西:

Id  A  B  Flag  COL
 1  5  4   0     0
 1  5  8   1     1
 1  6  4   0     1
 1  4  7   1     2
 2  7  6   0     0
 2  8  9   1     1
 2  3  8   1     2

我有必须根据 id 进行分区的数据框,并且我有基于条件的标志(A<B,然后 1),我需要根据前一行获取列中的行。逻辑是如果 flag 为 1,那么 COL 将是前一行值+1,否则如果 flag 为 0,COL 将是前一行列本身的值。PS-我们在df中没有COL列,我是根据上述逻辑创建它的。我的输出应该像上面提到的表格。

标签: pysparkapache-spark-sqlpyspark-dataframes

解决方案


考虑到我所做的不同评论,这是我基于有效数据集的解决方案:

from pyspark.sql import functions as F, Window

df.show()  # Without columns parition and order, it is impossible to compute COL

+---------+-----+---+---+
|partition|order|  A|  B|
+---------+-----+---+---+
|        1|    1|  5|  4|
|        1|    2|  5|  8|
|        1|    3|  6|  4|
|        1|    4|  4|  7|
|        2|    1|  7|  6|
|        2|    2|  8|  9|
|        2|    3|  3|  8|
+---------+-----+---+---+

df.withColumn("flag", F.when(F.col("A") < F.col("B"), 1).otherwise(0)).withColumn(
    "COL",
    F.sum("flag").over(
        Window.partitionBy("partition").orderBy(
            "order"
        )  # Window is the reason why we need these two columns
    ),
).show()

+---------+-----+---+---+----+---+
|partition|order|  A|  B|flag|COL|
+---------+-----+---+---+----+---+
|        1|    1|  5|  4|   0|  0|
|        1|    2|  5|  8|   1|  1|
|        1|    3|  6|  4|   0|  1|
|        1|    4|  4|  7|   1|  2|
|        2|    1|  7|  6|   0|  0|
|        2|    2|  8|  9|   1|  1|
|        2|    3|  3|  8|   1|  2|
+---------+-----+---+---+----+---+

推荐阅读