python - 使用 Spark 创建“EventID”列以帮助区分不同的记录集
问题描述
我当前的数据集
+---+-----+
| ID|Event|
+---+-----+
| 1| run|
| 1| run|
| 1| run|
| 1| swim|
| 1| run|
| 1| swim|
| 1| swim|
| 1| run|
| 1| run|
| 1| run|
| 1| swim|
| 1| run|
| 1| run|
| 2| run|
| 2| run|
| 2| run|
| 2| swim|
| 2| run|
| 2| swim|
| 2| swim|
| 2| swim|
| 2| swim|
| 2| run|
| 3| run|
| 3| run|
| 3| swim|
+---+-----+
我追求的数据集
+---+-----+-------+
| ID|Event|EventID|
+---+-----+-------+
| 1| run| 1|
| 1| run| 1|
| 1| run| 1|
| 1| swim| 1|
| 1| run| 2|
| 1| swim| 2|
| 1| swim| 2|
| 1| run| 3|
| 1| run| 3|
| 1| run| 3|
| 1| swim| 3|
| 1| run| 4|
| 1| run| 4|
| 2| run| 1|
| 2| run| 1|
| 2| run| 1|
| 2| swim| 1|
| 2| run| 2|
| 2| swim| 2|
| 2| swim| 2|
| 2| swim| 2|
| 2| swim| 2|
| 2| run| 3|
| 3| run| 1|
| 3| run| 1|
| 3| swim| 1|
+---+-----+-------+
我的问题
有人介意帮助我(或引用我)创建上述EventID列所需的功能吗?我相信我可以通过在ID列上应用某种 groupby然后使用 lambda 表达式来跟踪事件列模式来实现这一点?
对于每个唯一 ID,创建EventId的规则是之前的Event记录必须是“游泳”,而当前的Event记录必须是“跑步”。请注意,可能有多个 'run' 和 'swim' 值附加到同一个EventID。
非常感谢任何建议或指导。
谢谢你们。
生成原始/示例数据集的当前代码
# My current dataset
data = [
(1, "run"),
(1, "run"),
(1, "run"),
(1, "swim"),
(1, "run"),
(1, "swim"),
(1, "swim"),
(1, "run"),
(1, "run"),
(1, "run"),
(1, "swim"),
(1, "run"),
(1, "run"),
(2, "run"),
(2, "run"),
(2, "run"),
(2, "swim"),
(2, "run"),
(2, "swim"),
(2, "swim"),
(2, "swim"),
(2, "swim"),
(2, "run"),
(3, "run"),
(3, "run"),
(3, "swim")
]
schema = StructType([
StructField('ID', IntegerType(),True), \
StructField('Event', StringType(),True)
])
df = spark.createDataFrame(data=data, schema=schema)
df.show(30)
解决方案
任何基于 spark 的行号都对 Spark 处理有限制,因为它破坏了整个并行性并且所有内容都在一个执行程序中执行。如果您的数据集很大,下面的解决方案将不起作用,您可能需要寻找其他东西,下面的解决方案是使用几个窗口函数和行号。
from pyspark.sql.window import Window
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import *
data = [
(1, "run"),
(1, "run"),
(1, "run"),
(1, "swim"),
(1, "run"),
(1, "swim"),
(1, "swim"),
(1, "run"),
(1, "run"),
(1, "run"),
(1, "swim"),
(1, "run"),
(1, "run"),
(2, "run"),
(2, "run"),
(2, "run"),
(2, "swim"),
(2, "run"),
(2, "swim"),
(2, "swim"),
(2, "swim"),
(2, "swim"),
(2, "run"),
(3, "run"),
(3, "run"),
(3, "swim")
]
schema = StructType([
StructField('ID', IntegerType(),True), \
StructField('Event', StringType(),True)
])
df = spark.createDataFrame(data=data, schema=schema)
w = Window.partitionBy('value').orderBy('index')
w1 = Window.orderBy('index')
# Assigning line number
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
# Assigning First Level of window function and assigning group numbers if prior row value changes using no partition window
df_1 = df_1.withColumn("valueChange",(f.col("value") != f.lag("value").over(w1)).cast("int")) \
.fillna(0,subset=["valueChange"])\
.withColumn("indicator",(~((f.col("valueChange") == 0))).cast("int"))\
.withColumn("group",f.sum(f.col("indicator")).over(w1.rangeBetween(Window.unboundedPreceding, 0)))
# Assigning Second Level of window function and assigning group numbers with different window
df_1 = df_1.withColumn("groupvaluechnage",(f.col("group") != f.lag("group").over(w)).cast("int")) \
.fillna(0,subset=["groupvaluechnage"])\
.withColumn("groupindicator",(~((f.col("groupvaluechnage") == 0))).cast("int"))\
.withColumn("secondgroup",f.sum(f.col("groupindicator")).over(w.rangeBetween(Window.unboundedPreceding, 0)))
df_1.createOrReplaceTempView("GroupedNumbered")
# Selectting only needed columns
df_final=spark.sql("select value.ID,value.Event, secondgroup + 1 as EventID from GroupedNumbered order by index")
df_final.show()
结果产生
+---+-----+-------+
| ID|Event|EventID|
+---+-----+-------+
| 1| run| 1|
| 1| run| 1|
| 1| run| 1|
| 1| swim| 1|
| 1| run| 2|
| 1| swim| 2|
| 1| swim| 2|
| 1| run| 3|
| 1| run| 3|
| 1| run| 3|
| 1| swim| 3|
| 1| run| 4|
| 1| run| 4|
| 2| run| 1|
| 2| run| 1|
| 2| run| 1|
| 2| swim| 1|
| 2| run| 2|
| 2| swim| 2|
| 2| swim| 2|
| 2| swim| 2|
| 2| swim| 2|
| 2| run| 3|
| 3| run| 1|
| 3| run| 1|
| 3| swim| 1|
+---+-----+-------+
推荐阅读
- c# - 如何在 ASP.NET MVC 中根据条件打开/关闭基于身份的身份验证
- sql - SQL - 添加列
- php - 简单的 if 语句返回意外值
- php - 有什么方法可以绕过 Defined("App") 或退出("no access")
- javascript - 单击按钮后,将事件侦听器添加到窗口
- python - 无法从类中正确获取列表数据
- tcl - 无法使用 exit 命令终止 TCL 脚本,因为它未被识别
- javascript - 如何使用 GitHub Actions 在我的项目中的特定目录中运行 ESLint
- here-api - 反向地理编码端点响应:“访问”字段
- php - 使用印度电话的正则表达式模式验证 woocommerce 结帐页面表单,用于 wordpress 中的模式 +91-1234567890 号码