首页 > 解决方案 > 使用 Spark 创建“EventID”列以帮助区分不同的记录集

问题描述

我当前的数据集

+---+-----+
| ID|Event|
+---+-----+
|  1|  run|
|  1|  run|
|  1|  run|
|  1| swim|
|  1|  run|
|  1| swim|
|  1| swim|
|  1|  run|
|  1|  run|
|  1|  run|
|  1| swim|
|  1|  run|
|  1|  run|
|  2|  run|
|  2|  run|
|  2|  run|
|  2| swim|
|  2|  run|
|  2| swim|
|  2| swim|
|  2| swim|
|  2| swim|
|  2|  run|
|  3|  run|
|  3|  run|
|  3| swim|
+---+-----+

我追求的数据集

+---+-----+-------+
| ID|Event|EventID|
+---+-----+-------+
|  1|  run|      1|
|  1|  run|      1|
|  1|  run|      1|
|  1| swim|      1|
|  1|  run|      2|
|  1| swim|      2|
|  1| swim|      2|
|  1|  run|      3|
|  1|  run|      3|
|  1|  run|      3|
|  1| swim|      3|
|  1|  run|      4|
|  1|  run|      4|
|  2|  run|      1|
|  2|  run|      1|
|  2|  run|      1|
|  2| swim|      1|
|  2|  run|      2|
|  2| swim|      2|
|  2| swim|      2|
|  2| swim|      2|
|  2| swim|      2|
|  2|  run|      3|
|  3|  run|      1|
|  3|  run|      1|
|  3| swim|      1|
+---+-----+-------+

我的问题

有人介意帮助我(或引用我)创建上述EventID列所需的功能吗?我相信我可以通过在ID列上应用某种 groupby然后使用 lambda 表达式来跟踪事件列模式来实现这一点?

对于每个唯一 ID,创建EventId的规则是之前的Event记录必须是“游泳”,而当前的Event记录必须是“跑步”。请注意,可能有多个 'run' 和 'swim' 值附加到同一个EventID

非常感谢任何建议或指导。

谢谢你们。

生成原始/示例数据集的当前代码

# My current dataset

data = [
       (1, "run"),
       (1, "run"),
       (1, "run"),
       (1, "swim"),
       (1, "run"),
       (1, "swim"),
       (1, "swim"),
       (1, "run"),
       (1, "run"),
       (1, "run"),
       (1, "swim"),
       (1, "run"),
       (1, "run"),
       (2, "run"),
       (2, "run"),
       (2, "run"),
       (2, "swim"),
       (2, "run"),
       (2, "swim"),
       (2, "swim"),
       (2, "swim"),
       (2, "swim"),
       (2, "run"),
       (3, "run"),
       (3, "run"),
       (3, "swim")
        ]

schema = StructType([
  StructField('ID', IntegerType(),True), \
  StructField('Event', StringType(),True)
])

df = spark.createDataFrame(data=data, schema=schema)
df.show(30)

标签: pythonsqlapache-sparkpysparkapache-spark-sql

解决方案


任何基于 spark 的行号都对 Spark 处理有限制,因为它破坏了整个并行性并且所有内容都在一个执行程序中执行。如果您的数据集很大,下面的解决方案将不起作用,您可能需要寻找其他东西,下面的解决方案是使用几个窗口函数和行号。

from pyspark.sql.window import Window
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import *
data = [
       (1, "run"),
       (1, "run"),
       (1, "run"),
       (1, "swim"),
       (1, "run"),
       (1, "swim"),
       (1, "swim"),
       (1, "run"),
       (1, "run"),
       (1, "run"),
       (1, "swim"),
       (1, "run"),
       (1, "run"),
       (2, "run"),
       (2, "run"),
       (2, "run"),
       (2, "swim"),
       (2, "run"),
       (2, "swim"),
       (2, "swim"),
       (2, "swim"),
       (2, "swim"),
       (2, "run"),
       (3, "run"),
       (3, "run"),
       (3, "swim")
        ]

schema = StructType([
  StructField('ID', IntegerType(),True), \
  StructField('Event', StringType(),True)
])

df = spark.createDataFrame(data=data, schema=schema)
w = Window.partitionBy('value').orderBy('index')
w1 = Window.orderBy('index')
# Assigning line number
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
# Assigning First Level of window function and assigning group numbers if prior row value changes using no partition window
df_1 = df_1.withColumn("valueChange",(f.col("value") != f.lag("value").over(w1)).cast("int")) \
        .fillna(0,subset=["valueChange"])\
        .withColumn("indicator",(~((f.col("valueChange") == 0))).cast("int"))\
        .withColumn("group",f.sum(f.col("indicator")).over(w1.rangeBetween(Window.unboundedPreceding, 0)))
# Assigning Second Level of window function and assigning group numbers with different window
df_1 = df_1.withColumn("groupvaluechnage",(f.col("group") != f.lag("group").over(w)).cast("int")) \
        .fillna(0,subset=["groupvaluechnage"])\
        .withColumn("groupindicator",(~((f.col("groupvaluechnage") == 0))).cast("int"))\
        .withColumn("secondgroup",f.sum(f.col("groupindicator")).over(w.rangeBetween(Window.unboundedPreceding, 0)))
df_1.createOrReplaceTempView("GroupedNumbered")
# Selectting only needed columns
df_final=spark.sql("select value.ID,value.Event, secondgroup + 1 as EventID from GroupedNumbered order by index")
df_final.show()

结果产生

+---+-----+-------+
| ID|Event|EventID|
+---+-----+-------+
|  1|  run|      1|
|  1|  run|      1|
|  1|  run|      1|
|  1| swim|      1|
|  1|  run|      2|
|  1| swim|      2|
|  1| swim|      2|
|  1|  run|      3|
|  1|  run|      3|
|  1|  run|      3|
|  1| swim|      3|
|  1|  run|      4|
|  1|  run|      4|
|  2|  run|      1|
|  2|  run|      1|
|  2|  run|      1|
|  2| swim|      1|
|  2|  run|      2|
|  2| swim|      2|
|  2| swim|      2|
|  2| swim|      2|
|  2| swim|      2|
|  2|  run|      3|
|  3|  run|      1|
|  3|  run|      1|
|  3| swim|      1|
+---+-----+-------+

推荐阅读