首页 > 解决方案 > 在 pyspark 中,基于变量字段进行分组,并为特定值添加一个计数器(在变量更改时重置)

问题描述

从熊猫数据框创建火花数据框

import pandas as pd
df = pd.DataFrame({"b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],"a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2]})

df2=spark.createDataFrame(df) 

接下来我在字段“b”上使用窗口分区

from pyspark.sql import window
win_spec = (window.Window.partitionBy(['b']).orderBy("Sno").rowsBetween(window.Window.unboundedPreceding, 0))

根据值添加一个字段 positive ,negative 并创建一个 lambda 函数

df2 = df2.withColumn("pos_neg",col("a") <0)
pos_neg_func =udf(lambda x: ((x) & (x != x.shift())).cumsum())

尝试创建一个新列(这是一个负值计数器,但在变量'b'内。所以当'b'中的字段发生变化时计数器重新启动。此外,如果有连续的-ve值,它们应该被视为单个值,计数器变化 1

df3 = (df2.select('pos_neg',pos_neg_func('pos_neg').alias('val')))

我希望输出为,

      b  Sno  a    val  val_2
0   A    1  3  False      0
1   A    2 -4   True      1
2   A    3  2  False      1
3   A    4 -1   True      2
4   B    5 -3   True      1
5   B    6 -1   True      1
6   B    7 -7   True      1
7   C    8 -6   True      1
8   C    9  1  False      1
9   D   10  1  False      0
10  D   11 -1   True      1
11  D   12  1  False      1
12  D   13  4  False      1
13  D   14  5  False      1
14  D   15 -3   True      2
15  D   16  2  False      2
16  D   17  3  False      2
17  D   18  4  False      2
18  D   19 -1   True      3
19  D   20 -2   True      3

在 python 中,一个简单的函数如下所示:

df['val'] = df.groupby('b')['pos_neg'].transform(lambda x: ((x) & (x != x.shift())).cumsum())

josh-friedlander 在上面的代码中提供了支持

标签: python-3.xpysparkapache-spark-sqluser-defined-functions

解决方案


Pyspark 没有移位功能,但您可以使用滞后窗口功能,它为您提供当前行之前的行。第一个窗口(称为 w)将列的值设置val为 1,如果该pos_negTrue的值是并且前一个的值pos_negFalse,否则设置为 0。通过第二个窗口(称为 w2),我们计算累积总和以获得您想要的

import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window

df = pd.DataFrame({"b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],"a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2]})

df2=spark.createDataFrame(df) 

w = Window.partitionBy('b').orderBy('Sno')
w2 = Window.partitionBy('b').rowsBetween(Window.unboundedPreceding, 0).orderBy('Sno')

df2 = df2.withColumn("pos_neg",col("a") <0)

df2 = df2.withColumn('val', F.when((df2.pos_neg == True) & (F.lag('pos_neg', default=False).over(w) == False), 1).otherwise(0))
df2 = df2.withColumn('val',  F.sum('val').over(w2))

df2.show()

输出:

+---+---+---+-------+---+ 
|Sno|  a|  b|pos_neg|val| 
+---+---+---+-------+---+ 
|  5| -3|  B|   true|  1| 
|  6| -1|  B|   true|  1| 
|  7| -7|  B|   true|  1| 
| 10|  1|  D|  false|  0| 
| 11| -1|  D|   true|  1| 
| 12|  1|  D|  false|  1| 
| 13|  4|  D|  false|  1| 
| 14|  5|  D|  false|  1| 
| 15| -3|  D|   true|  2| 
| 16|  2|  D|  false|  2| 
| 17|  3|  D|  false|  2| 
| 18|  4|  D|  false|  2| 
| 19| -1|  D|   true|  3| 
| 20| -2|  D|   true|  3| 
|  8| -6|  C|   true|  1| 
|  9|  1|  C|  false|  1| 
|  1|  3|  A|  false|  0| 
|  2| -4|  A|   true|  1| 
|  3|  2|  A|  false|  1| 
|  4| -1|  A|   true|  2| 
+---+---+---+-------+---+

您可能想知道为什么需要一个允许我们对数据集进行排序的列。让我试着用一个例子来解释这一点。以下数据由 pandas 读取并分配了索引(左列)。您想计算Truein 的出现次数pos_neg,而不想计算连续True的 's 。此逻辑导致val2列如下所示:

    b  Sno  a   pos_neg  val_2
0   A    1  3  False      0
1   A    2 -4   True      1
2   A    3  2  False      1
3   A    4 -1   True      2
4   A    5 -5   True      2

...但这取决于它从 pandas 获得的索引(行顺序)。当您更改行的顺序(以及相应的 pandas 索引)时,当您将逻辑应用于相同的行时,您将得到不同的结果,只是因为顺序不同:

    b  Sno  a   pos_neg  val_2
0   A    1  3  False      0
1   A    3  2  False      0
2   A    2 -4   True      1
3   A    4 -1   True      1
4   A    5 -5   True      1

您会看到行的顺序很重要。您现在可能想知道为什么 pyspark 不像 pandas 那样创建索引。这是因为 spark 将您的数据保存在多个分区中,这些分区分布在您的集群上,并且取决于您的数据源,甚至能够分布式读取数据。因此,在读取数据期间不能添加索引。您可以在使用monotonically_increasing_id函数读取数据后添加一个,但由于读取过程,您的数据与数据源相比可能已经具有不同的顺序。

您的sno专栏避免了这个问题,并保证您将始终获得相同数据的相同结果(确定性)。


推荐阅读