首页 > 解决方案 > PySpark:如何根据事件创建自上次事件计数器和唯一标识符以来的时间?

问题描述

我有一个看起来像这样的数据框。要在标准熊猫中进行所需的操作,我会执行以下操作:

import pandas as pd
case = pd.Series(['A', 'A', 'A', 'A',
                  'B', 'B', 'B', 'B',
                  'C', 'C', 'C', 'C'])
y = pd.Series([0, 1, 0, 0,
               0, 1, 0, 0,
               0, 0, 1, 0])
year = [2016, 2017, 2018, 2019,
        2016, 2017, 2018, 2019,
        2016, 2017, 2018, 2019]

dict = {'case': case, 'y': y, 'year': year}
df = pd.DataFrame(dict)

# the transformations of interest
df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()

我正在寻求有关如何将这两个命令转换为 PySpark 数据帧的帮助。

df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()

预期输出如下所示:


  case  y   year    case_id counter
    A   0   2016    1        0
    A   1   2017    1        1
    A   0   2018    2        0
    A   0   2019    2        1
    B   0   2016    3        0
    B   1   2017    3        1
    B   0   2018    4        0
    B   0   2019    4        1
    C   0   2016    5        0
    C   0   2017    5        1
    C   1   2018    5        2
    C   0   2019    6        0

标签: pythonpysparkpyspark-dataframes

解决方案


这几乎就像一个常见问题解答,另请参阅我的旧帖子中的另一个示例。对于此示例,您可以尝试以下操作:

from pyspark.sql import functions as F
from pyspark.sql import Window

pdf = spark.createDataFrame(df)

w1 = Window.partitionBy().orderBy('case', 'year')
w2 = Window.partitionBy('case_id').orderBy('case', 'year')

df_new = pdf.withColumn("case_id", F.sum(F.when(~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1),1).otherwise(0)).over(w1)+1) \
    .withColumn('counter', F.count('*').over(w2)-1)

df_new.show()
+----+---+----+-------+-------+
|case|  y|year|case_id|counter|
+----+---+----+-------+-------+
|   A|  0|2016|      1|      0|
|   A|  1|2017|      1|      1|
|   A|  0|2018|      2|      0|
|   A|  0|2019|      2|      1|
|   B|  0|2016|      3|      0|
|   B|  1|2017|      3|      1|
|   B|  0|2018|      4|      0|
|   B|  0|2019|      4|      1|
|   C|  0|2016|      5|      0|
|   C|  0|2017|      5|      1|
|   C|  1|2018|      5|      2|
|   C|  0|2019|      6|      0|
+----+---+----+-------+-------+

在哪里:

  1. 设置 WindSpecw1以按 对行进行排序caseyear然后使用lag函数查找上一个值(类似于pandas 中的shift)。

    pandas: (~(df.case == df.case.shift())) | (df.y.shift()==1) 
    pyspark: ~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1)
    

    注意: (1) orderBy inw1很重要,因为partitionBy会触发数据混洗,否则无法保证结果行的顺序。(2) 注意使用滞后函数的值,如果需要,使用滞后函数或合并函数的第三个参数设置默认值。

  2. 用于F.when(..,1).otherwise(0)将 (1) 的结果从boolean转换为int,然后执行cumsum

    pandas: df.c.cumsum()
    pyspark: F.sum(c).over(w1)+1
    
  3. case_id添加到 partitionBy 以设置w2然后执行cumcount(无需再执行一次cumsumgroupby

    pandas: df.groupby(..).cumcount()
    pyspark: F.count('*').over(w2)-1
    

对于大型数据框,设置 WinSpec withoutpartitionBy会将所有数据移动到单个分区中,这可能会产生OOM错误。事实上,如果您只是在case + case_idcumcount的每个组合中寻找,您更有可能执行以下操作:

w1 = Window.partitionBy('case').orderBy('year')
w2 = Window.partitionBy('case', 'case_id').orderBy('year')

df_new = pdf.withColumn("case_id", F.sum(F.when(F.lag("y",1,0).over(w1) == 1,1).otherwise(0)).over(w1)) \
    .withColumn('counter', F.count('*').over(w2)-1)

df_new.show()
+----+---+----+-------+-------+
|case|  y|year|case_id|counter|
+----+---+----+-------+-------+
|   B|  0|2016|      0|      0|
|   B|  1|2017|      0|      1|
|   B|  0|2018|      1|      0|
|   B|  0|2019|      1|      1|
|   C|  0|2016|      0|      0|
|   C|  0|2017|      0|      1|
|   C|  1|2018|      0|      2|
|   C|  0|2019|      1|      0|
|   A|  0|2016|      0|      0|
|   A|  1|2017|      0|      1|
|   A|  0|2018|      1|      0|
|   A|  0|2019|      1|      1|
+----+---+----+-------+-------+

推荐阅读