首页 > 解决方案 > 在 pyspark 中,根据两列值、变量和连续负值生成窗口分区的最小值

问题描述

创建了一个 rdd,具有列“a”,其中包含正值和负值

df = pd.DataFrame({"b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],
                   "Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],
                   "a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2],
                   "pos_neg": ['false','true','false','true','true','true','true','true','false','false','true','false','false','false','true','false','false','false','true','true'],
                   "neg_val_count":[0,1,1,2,1,1,1,1,1,0,1,1,1,1,2,2,2,2,3,3]})

df2=spark.createDataFrame(df) 

“pos_neg”列表示“a”中的字段是正还是负,如果为负则为真。“neg_val_count”是变量“b”字段中负值的计数器。每次变量 'b' 改变计数器复位和连续的负值被视为单一。因此,对于变量“B”(在“b”列中),即使存在三个负值,计数器也是一个。

我想生成一个列,该列将具有“b”(例如A)中的变量和“a”中的值(对于两个假之间的真情况)的组合的最小值。例如,'A' 和 true 的第一个组合,值将是 -4(它被 false 包围),'A' 和 true 值的第二个组合将是 -1,对于 B,有三个连续的 true 所以值将是其中最小的 -7。基本上连续的负值被取为一,最小值被取出。期望值是指所需的结果

    b  Sno  a pos_neg  neg_val_count   expected value
0   A    1  3   false              0        3
1   A    2 -4    true              1       -4
2   A    3  2   false              1        2
3   A    4 -1    true              2       -1
4   B    5 -3    true              1       -7
5   B    6 -1    true              1       -7
6   B    7 -7    true              1       -7
7   C    8 -6    true              1       -6
8   C    9  1   false              1        1
9   D   10  1   false              0        1
10  D   11 -1    true              1       -1
11  D   12  1   false              1        1
12  D   13  4   false              1        4
13  D   14  5   false              1        5
14  D   15 -3    true              2       -3
15  D   16  2   false              2        2
16  D   17  3   false              2        3
17  D   18  4   false              2        4
18  D   19 -1    true              3       -2
19  D   20 -2    true              3       -2

我尝试使用以下方法,但它不起作用,这方面的任何支持都会很棒。

w3 = Window.partitionBy('b','pos_neg').rowsBetween(Window.unboundedPreceding, 0).orderBy('Sno')

df2.withColumn('new_col', F.min('a').over(w3))

标签: python-3.xgroup-bypyspark

解决方案


推荐阅读