首页 > 解决方案 > 根据另一列值分配 ID

问题描述

有没有办法根据 state 列的值分配操作 id?目的是为每个 start-on-end 序列分配递增的 ID。例如:在下表中,从 2020-09-15 22:49 开始的初始操作的操作 id = 1,直到操作结束的所有行也将采用 id 1。每个开始/结束状态开始和结束之间的所有“开启”状态都将具有相同的 ID。

Timestamp         |state | operation id
----------------------------------------
2020-09-15 22:53    start   1
2020-09-16 22:53    on      1
2020-09-17 22:53    on      1
2020-09-18 22:53    on      1
2020-09-19 22:53    end     1
2020-09-20 22:53    off     null
2020-09-21 22:53    off     null
2020-09-22 22:53    off     null
2020-09-23 22:53    start   2
2020-09-24 22:53    on      2
2020-09-25 22:53    end     2
2020-09-26 22:53    start   3
2020-09-27 22:53    end     3

时间戳和状态列可用。目的是构建操作 id 列。

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


您可以使用Window按“时间戳”排序的函数。由于您希望operation_id始终null在“状态”为“关闭”时保持状态,因此我将过滤状态“关闭”行并将其作为单独的数据框。我们将 'start' 指定为1, 'on' 指定为0, 'end' 指定为2

首先,incremental sum在您的窗口上获得一个新的数字分配“状态”列。对应于“incremental sum结束”状态将始终是 3 的倍数。这也将是您的“序列结束”

为了得到你需要的东西,你必须lag在列上使用一个函数incremental sum,然后用滞后值替换 3s 的倍数。最后一步是除以 3,将其转换为整数并加 1。

现在联合df_not_offdf_off用于最终输出

您的数据框:

from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *

schema = StructType([StructField("Timestamp", IntegerType()), StructField("state", StringType())])

data = [[1, 'start'], [2, 'on'], [3, 'on'], [4, 'on'], [5, 'end'], [6, 'off'], [7, 'off'], \
        [8, 'off'], [9, 'start'], [10, 'on'], [11, 'end'], [12, 'start'], [13, 'end']]

df = spark.createDataFrame(data,schema=schema)

df.show()

操作:

df_off = df.filter(col("state")=="off")
df_not_off = df.filter(col("state")!="off")
df_not_off = df_not_off.withColumn("state_num", F.when(col("state")=="start", 1).when(col("state")=="on", 0).otherwise(2))

w=Window().orderBy("Timestamp")

df_not_off = df_not_off.withColumn("incremental_sum", F.sum("state_num").over(w))\
  .withColumn("lag", F.lag("incremental_sum").over(w))\
  .withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
  .withColumn("incremental_sum", ((F.col("incremental_sum")/3).cast('integer'))+1)\
  .withColumnRenamed("incremental_sum", "operation_id")\
  .drop("state_num", "lag")

df_off = df_off.withColumn("operation_id", F.lit(None))

final_df = df_not_off.union(df_off)

final_df.orderBy("Timestamp").show()

输出:

+---------+-----+------------+                                                  
|Timestamp|state|operation_id|
+---------+-----+------------+
|        1|start|           1|
|        2|   on|           1|
|        3|   on|           1|
|        4|   on|           1|
|        5|  end|           1|
|        6|  off|        null|
|        7|  off|        null|
|        8|  off|        null|
|        9|start|           2|
|       10|   on|           2|
|       11|  end|           2|
|       12|start|           3|
|       13|  end|           3|
+---------+-----+------------+

推荐阅读