首页 > 解决方案 > 为重复的行序列分配一个标识符

问题描述

我有一个数据框,我需要在其中生成“CycleID”列,如下所示:

+-------+-------+----------+---------+
| type  | stage | Timestamp| CycleID |
+-------+-------+----------+---------+
| type1 | s1    | a        | 1       |
| type1 | s2    | b        | 1       |
| type1 | s2    | c        | 1       |
| type1 | s3    | d        | 1       |
| type1 | s1    | e        | 2       |
| type1 | s2    | f        | 2       |
| type1 | s3    | g        | 2       |
| type2 | s1    | a        | 1       |
| type2 | s2    | b        | 1       |
| type2 | s3    | c        | 1       |
+-------+-------+----------+---------+

数据约束

  1. 一种类型的每个周期都有 3 个按顺序发生的预定阶段。
  2. 一个周期内的各个阶段可以重复,但不能乱序发生。例如, stages1永远不会出现在 stage 之后s2
  3. 时间戳保证在每个阶段的行之间递增。例如:b > a

目标是创建一个新列“ CycleID”,以唯一标识每种类型的循环。

到目前为止我已经尝试过:

w = Window.partitionBy("type").orderBy("Timestamp")
inputdf = inputdf.withColumn("stagenum", func.expr("substring(stage, 2)")).withColumn("stagenum", col("stagenum").cast(IntegerType()))
inputdf = inputdf.withColumn("temp", func.when((col("stagenum") - func.lag("stagenum", 1).over(w)).isNull() | \
                                                (col("stagenum") - func.lag("stagenum", 1).over(w) == func.lit(0)) |\
                                                (col("stagenum") - func.lag("stagenum", 1).over(w) == func.lit(1)), func.lit(1)).otherwise(func.lit(100)))

除此之外,我用 lag() 尝试了不同的方法,但似乎没有一种干净的方法来分配 CycleId。

寻求帮助。

标签: apache-sparkpysparkcumsum

解决方案


数据

   l=[('type1' , 's1'    , 'a'        , 1),('type1','s2'    , 'b'        , 1  ),('type1' , 's1'    , 'a'        , 1),('type1','s2'    , 'b'        , 1  ), ('type1' , 's2'    , 'c'        , 1), ('type1' , 's3'    , 'd'        , 1),('type1' , 's1'    , 'e'        , 1),('type1','s2'    , 'f'        , 1  ), ('type1' , 's3'    , 'g'        , 1)]
df=spark.createDataFrame(l,['type'  , 'stage' , 'Timestamp', 'CycleID'])
df.show()

解决方案

from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import *


df=(
 df.withColumn('CycleID',col('stage')=='s1')#Generate Booleans through Selection
 
 .withColumn('CycleID', F.sum(F.col('CycleID').cast('integer'))#Convert Boolean to intergers
             
             .over(Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)))#rowsBetween(-sys.maxsize, 0) along with sum function is used to create cumulative sum of the column 
)
df.show()


+-----+-----+---------+-------+
| type|stage|Timestamp|CycleID|
+-----+-----+---------+-------+
|type1|   s1|        a|      1|
|type1|   s2|        b|      1|
|type1|   s2|        c|      1|
|type1|   s3|        d|      1|
|type1|   s1|        e|      2|
|type1|   s2|        f|      2|
|type1|   s3|        g|      2|
+-----+-----+---------+-------+

根据您的以下评论:

请按降序排序和布尔选择s3。下面的代码

df.sort(col('Timestamp').desc()).withColumn('CycleID',(col('stage')=='s3')).withColumn('CycleID', F.sum(F.col('CycleID').cast('integer')).over(Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0))).show()

+-----+-----+---------+-------+
| type|stage|Timestamp|CycleID|
+-----+-----+---------+-------+
|type1|   s3|        g|      1|
|type1|   s2|        f|      1|
|type1|   s1|        e|      1|
|type1|   s3|        d|      2|
|type1|   s2|        c|      2|
|type1|   s2|        b|      2|
|type1|   s2|        b|      2|
|type1|   s1|        a|      2|
|type1|   s1|        a|      2|
+-----+-----+---------+-------+

如果你可能有多个 s3. 使用滞后如下;

 m=Window.partitionBy()#.orderBy(F.desc('Timestamp'))
df1=df.select("*", lag("stage").over(m.orderBy(col("Timestamp"))).alias("CycleID1"))
df1.withColumn('CycleID',(((col('stage')=='s1')&(col('CycleID1').isNull()))|((col('stage')=='s1')&(col('CycleID1')=='s3')))).withColumn('CycleID', F.sum(F.col('CycleID').cast('integer')).over(m.rowsBetween(-sys.maxsize, 0))).drop('CycleID1').show()
+-----+-----+---------+-------+
| type|stage|Timestamp|CycleID|
+-----+-----+---------+-------+
|type1|   s1|        a|      1|
|type1|   s1|        a|      1|
|type1|   s2|        b|      1|
|type1|   s2|        b|      1|
|type1|   s2|        c|      1|
|type1|   s3|        d|      1|
|type1|   s1|        e|      2|
|type1|   s2|        f|      2|
|type1|   s3|        g|      2|
+-----+-----+---------+-------+

推荐阅读