python - PySpark:如何根据事件创建自上次事件计数器和唯一标识符以来的时间?
问题描述
我有一个看起来像这样的数据框。要在标准熊猫中进行所需的操作,我会执行以下操作:
import pandas as pd
case = pd.Series(['A', 'A', 'A', 'A',
'B', 'B', 'B', 'B',
'C', 'C', 'C', 'C'])
y = pd.Series([0, 1, 0, 0,
0, 1, 0, 0,
0, 0, 1, 0])
year = [2016, 2017, 2018, 2019,
2016, 2017, 2018, 2019,
2016, 2017, 2018, 2019]
dict = {'case': case, 'y': y, 'year': year}
df = pd.DataFrame(dict)
# the transformations of interest
df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()
我正在寻求有关如何将这两个命令转换为 PySpark 数据帧的帮助。
df['case_id'] = ((~(df.case == df.case.shift())) | (df.y.shift()==1)).cumsum()
df['counter'] = df.groupby(((df['case_id'] != df['case_id'].shift(1))).cumsum()).cumcount()
预期输出如下所示:
case y year case_id counter
A 0 2016 1 0
A 1 2017 1 1
A 0 2018 2 0
A 0 2019 2 1
B 0 2016 3 0
B 1 2017 3 1
B 0 2018 4 0
B 0 2019 4 1
C 0 2016 5 0
C 0 2017 5 1
C 1 2018 5 2
C 0 2019 6 0
解决方案
这几乎就像一个常见问题解答,另请参阅我的旧帖子中的另一个示例。对于此示例,您可以尝试以下操作:
from pyspark.sql import functions as F
from pyspark.sql import Window
pdf = spark.createDataFrame(df)
w1 = Window.partitionBy().orderBy('case', 'year')
w2 = Window.partitionBy('case_id').orderBy('case', 'year')
df_new = pdf.withColumn("case_id", F.sum(F.when(~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1),1).otherwise(0)).over(w1)+1) \
.withColumn('counter', F.count('*').over(w2)-1)
df_new.show()
+----+---+----+-------+-------+
|case| y|year|case_id|counter|
+----+---+----+-------+-------+
| A| 0|2016| 1| 0|
| A| 1|2017| 1| 1|
| A| 0|2018| 2| 0|
| A| 0|2019| 2| 1|
| B| 0|2016| 3| 0|
| B| 1|2017| 3| 1|
| B| 0|2018| 4| 0|
| B| 0|2019| 4| 1|
| C| 0|2016| 5| 0|
| C| 0|2017| 5| 1|
| C| 1|2018| 5| 2|
| C| 0|2019| 6| 0|
+----+---+----+-------+-------+
在哪里:
设置 WindSpec
w1
以按 对行进行排序case
,year
然后使用lag函数查找上一个值(类似于pandas 中的shift)。pandas: (~(df.case == df.case.shift())) | (df.y.shift()==1) pyspark: ~(F.col("case") == F.lag("case").over(w1)) | (F.lag("y",1,0).over(w1) == 1)
注意: (1) orderBy in
w1
很重要,因为partitionBy会触发数据混洗,否则无法保证结果行的顺序。(2) 注意使用滞后函数的空值,如果需要,使用滞后函数或合并函数的第三个参数设置默认值。用于
F.when(..,1).otherwise(0)
将 (1) 的结果从boolean转换为int,然后执行cumsum
:pandas: df.c.cumsum() pyspark: F.sum(c).over(w1)+1
将case_id添加到 partitionBy 以设置
w2
然后执行cumcount
(无需再执行一次cumsum
)groupby
:pandas: df.groupby(..).cumcount() pyspark: F.count('*').over(w2)-1
对于大型数据框,设置 WinSpec withoutpartitionBy
会将所有数据移动到单个分区中,这可能会产生OOM错误。事实上,如果您只是在case + case_idcumcount
的每个组合中寻找,您更有可能执行以下操作:
w1 = Window.partitionBy('case').orderBy('year')
w2 = Window.partitionBy('case', 'case_id').orderBy('year')
df_new = pdf.withColumn("case_id", F.sum(F.when(F.lag("y",1,0).over(w1) == 1,1).otherwise(0)).over(w1)) \
.withColumn('counter', F.count('*').over(w2)-1)
df_new.show()
+----+---+----+-------+-------+
|case| y|year|case_id|counter|
+----+---+----+-------+-------+
| B| 0|2016| 0| 0|
| B| 1|2017| 0| 1|
| B| 0|2018| 1| 0|
| B| 0|2019| 1| 1|
| C| 0|2016| 0| 0|
| C| 0|2017| 0| 1|
| C| 1|2018| 0| 2|
| C| 0|2019| 1| 0|
| A| 0|2016| 0| 0|
| A| 1|2017| 0| 1|
| A| 0|2018| 1| 0|
| A| 0|2019| 1| 1|
+----+---+----+-------+-------+
推荐阅读
- java - 动画汽车标记运动在停止时返回北方
- javascript - 尝试使用 fetchSignInMethodForEmail
- shell - 如何将外壳分叉到前台而不是后台?
- maven - 带有 Gitlab CI 的 Fabric8io 插件
- java - 时间跨度解析在前一年失败
- c# - ConvertApi 不遵循 HTTP 302 重定向
- python - 在 html/javascript 网页中获取 python 数据
- c++ - 用 2 个分隔符分隔输入文件
- pandas - 从熊猫数据框中创建词袋
- python - 为什么 Keras 看不到我的 GPU 而 TensorFlow 可以?