python-3.x - 在 pyspark 中,基于变量字段进行分组,并为特定值添加一个计数器(在变量更改时重置)
问题描述
从熊猫数据框创建火花数据框
import pandas as pd
df = pd.DataFrame({"b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],"a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2]})
df2=spark.createDataFrame(df)
接下来我在字段“b”上使用窗口分区
from pyspark.sql import window
win_spec = (window.Window.partitionBy(['b']).orderBy("Sno").rowsBetween(window.Window.unboundedPreceding, 0))
根据值添加一个字段 positive ,negative 并创建一个 lambda 函数
df2 = df2.withColumn("pos_neg",col("a") <0)
pos_neg_func =udf(lambda x: ((x) & (x != x.shift())).cumsum())
尝试创建一个新列(这是一个负值计数器,但在变量'b'内。所以当'b'中的字段发生变化时计数器重新启动。此外,如果有连续的-ve值,它们应该被视为单个值,计数器变化 1
df3 = (df2.select('pos_neg',pos_neg_func('pos_neg').alias('val')))
我希望输出为,
b Sno a val val_2
0 A 1 3 False 0
1 A 2 -4 True 1
2 A 3 2 False 1
3 A 4 -1 True 2
4 B 5 -3 True 1
5 B 6 -1 True 1
6 B 7 -7 True 1
7 C 8 -6 True 1
8 C 9 1 False 1
9 D 10 1 False 0
10 D 11 -1 True 1
11 D 12 1 False 1
12 D 13 4 False 1
13 D 14 5 False 1
14 D 15 -3 True 2
15 D 16 2 False 2
16 D 17 3 False 2
17 D 18 4 False 2
18 D 19 -1 True 3
19 D 20 -2 True 3
在 python 中,一个简单的函数如下所示:
df['val'] = df.groupby('b')['pos_neg'].transform(lambda x: ((x) & (x != x.shift())).cumsum())
josh-friedlander 在上面的代码中提供了支持
解决方案
Pyspark 没有移位功能,但您可以使用滞后窗口功能,它为您提供当前行之前的行。第一个窗口(称为 w)将列的值设置val
为 1,如果该pos_neg
列True
的值是并且前一个的值pos_neg
是False
,否则设置为 0。通过第二个窗口(称为 w2),我们计算累积总和以获得您想要的
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
df = pd.DataFrame({"b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],"a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2]})
df2=spark.createDataFrame(df)
w = Window.partitionBy('b').orderBy('Sno')
w2 = Window.partitionBy('b').rowsBetween(Window.unboundedPreceding, 0).orderBy('Sno')
df2 = df2.withColumn("pos_neg",col("a") <0)
df2 = df2.withColumn('val', F.when((df2.pos_neg == True) & (F.lag('pos_neg', default=False).over(w) == False), 1).otherwise(0))
df2 = df2.withColumn('val', F.sum('val').over(w2))
df2.show()
输出:
+---+---+---+-------+---+
|Sno| a| b|pos_neg|val|
+---+---+---+-------+---+
| 5| -3| B| true| 1|
| 6| -1| B| true| 1|
| 7| -7| B| true| 1|
| 10| 1| D| false| 0|
| 11| -1| D| true| 1|
| 12| 1| D| false| 1|
| 13| 4| D| false| 1|
| 14| 5| D| false| 1|
| 15| -3| D| true| 2|
| 16| 2| D| false| 2|
| 17| 3| D| false| 2|
| 18| 4| D| false| 2|
| 19| -1| D| true| 3|
| 20| -2| D| true| 3|
| 8| -6| C| true| 1|
| 9| 1| C| false| 1|
| 1| 3| A| false| 0|
| 2| -4| A| true| 1|
| 3| 2| A| false| 1|
| 4| -1| A| true| 2|
+---+---+---+-------+---+
您可能想知道为什么需要一个允许我们对数据集进行排序的列。让我试着用一个例子来解释这一点。以下数据由 pandas 读取并分配了索引(左列)。您想计算True
in 的出现次数pos_neg
,而不想计算连续True
的 's 。此逻辑导致val2
列如下所示:
b Sno a pos_neg val_2
0 A 1 3 False 0
1 A 2 -4 True 1
2 A 3 2 False 1
3 A 4 -1 True 2
4 A 5 -5 True 2
...但这取决于它从 pandas 获得的索引(行顺序)。当您更改行的顺序(以及相应的 pandas 索引)时,当您将逻辑应用于相同的行时,您将得到不同的结果,只是因为顺序不同:
b Sno a pos_neg val_2
0 A 1 3 False 0
1 A 3 2 False 0
2 A 2 -4 True 1
3 A 4 -1 True 1
4 A 5 -5 True 1
您会看到行的顺序很重要。您现在可能想知道为什么 pyspark 不像 pandas 那样创建索引。这是因为 spark 将您的数据保存在多个分区中,这些分区分布在您的集群上,并且取决于您的数据源,甚至能够分布式读取数据。因此,在读取数据期间不能添加索引。您可以在使用monotonically_increasing_id函数读取数据后添加一个,但由于读取过程,您的数据与数据源相比可能已经具有不同的顺序。
您的sno
专栏避免了这个问题,并保证您将始终获得相同数据的相同结果(确定性)。
推荐阅读
- navigation - A* 或 Dijkstra 对行人导航系统更有效吗?
- c - 为什么数组元素中的元素会移位?
- vue.js - 如何在 vue-router 中为嵌套路由使用相同的组件
- mysql - 选择小于上一个的值
- javascript - 在没有 for 或 while 循环的情况下获取字符串中每个重叠出现的索引
- c++ - 使用删除时堆损坏
- java - Android为前置摄像头提供错误格式
- javascript - 使用 React-Multi-Carousel 反应轮播
- c++ - 仅在少数方法(静态或共享库)中未解析的外部符号
- c - opencv 程序中的错误:输入参数的大小不匹配