python - 根据另一列值分配 ID
问题描述
有没有办法根据 state 列的值分配操作 id?目的是为每个 start-on-end 序列分配递增的 ID。例如:在下表中,从 2020-09-15 22:49 开始的初始操作的操作 id = 1,直到操作结束的所有行也将采用 id 1。每个开始/结束状态开始和结束之间的所有“开启”状态都将具有相同的 ID。
Timestamp |state | operation id
----------------------------------------
2020-09-15 22:53 start 1
2020-09-16 22:53 on 1
2020-09-17 22:53 on 1
2020-09-18 22:53 on 1
2020-09-19 22:53 end 1
2020-09-20 22:53 off null
2020-09-21 22:53 off null
2020-09-22 22:53 off null
2020-09-23 22:53 start 2
2020-09-24 22:53 on 2
2020-09-25 22:53 end 2
2020-09-26 22:53 start 3
2020-09-27 22:53 end 3
时间戳和状态列可用。目的是构建操作 id 列。
解决方案
您可以使用Window
按“时间戳”排序的函数。由于您希望operation_id
始终null
在“状态”为“关闭”时保持状态,因此我将过滤状态“关闭”行并将其作为单独的数据框。我们将 'start' 指定为1
, 'on' 指定为0
, 'end' 指定为2
首先,incremental sum
在您的窗口上获得一个新的数字分配“状态”列。对应于“incremental sum
结束”状态将始终是 3 的倍数。这也将是您的“序列结束”
为了得到你需要的东西,你必须lag
在列上使用一个函数incremental sum
,然后用滞后值替换 3s 的倍数。最后一步是除以 3,将其转换为整数并加 1。
现在联合df_not_off
并df_off
用于最终输出
您的数据框:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
schema = StructType([StructField("Timestamp", IntegerType()), StructField("state", StringType())])
data = [[1, 'start'], [2, 'on'], [3, 'on'], [4, 'on'], [5, 'end'], [6, 'off'], [7, 'off'], \
[8, 'off'], [9, 'start'], [10, 'on'], [11, 'end'], [12, 'start'], [13, 'end']]
df = spark.createDataFrame(data,schema=schema)
df.show()
操作:
df_off = df.filter(col("state")=="off")
df_not_off = df.filter(col("state")!="off")
df_not_off = df_not_off.withColumn("state_num", F.when(col("state")=="start", 1).when(col("state")=="on", 0).otherwise(2))
w=Window().orderBy("Timestamp")
df_not_off = df_not_off.withColumn("incremental_sum", F.sum("state_num").over(w))\
.withColumn("lag", F.lag("incremental_sum").over(w))\
.withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
.withColumn("incremental_sum", ((F.col("incremental_sum")/3).cast('integer'))+1)\
.withColumnRenamed("incremental_sum", "operation_id")\
.drop("state_num", "lag")
df_off = df_off.withColumn("operation_id", F.lit(None))
final_df = df_not_off.union(df_off)
final_df.orderBy("Timestamp").show()
输出:
+---------+-----+------------+
|Timestamp|state|operation_id|
+---------+-----+------------+
| 1|start| 1|
| 2| on| 1|
| 3| on| 1|
| 4| on| 1|
| 5| end| 1|
| 6| off| null|
| 7| off| null|
| 8| off| null|
| 9|start| 2|
| 10| on| 2|
| 11| end| 2|
| 12|start| 3|
| 13| end| 3|
+---------+-----+------------+
推荐阅读
- sql - 避免插入的触发器问题(Oracle SQL)
- php - 序列化来自 Symfony 中实体的嵌套对象的数组/对象
- f# - 如何使用 FSharp.Compiler.Service 从 F# 字符串中获取 F# AST?
- pandas - dtype int 的数据框中的 NaN 值未填充
- python - 无效的语法头 Python
- python - 字典中的嵌套列表
- sql - 如果列具有 xyz 以外的任何条目,则从输出中排除
- django - Django:将参数传递给 URL 模式名称
- python - MatPlotLib:如何在堆积条形图的每个条上显示总和标签?
- validation - 雪花复制命令结果扫描不一致