首页 > 解决方案 > 在多个时间序列的末尾删除 0

问题描述

我有多个时间序列存储在 Spark DataFrame 中,如下所示:

df = spark.createDataFrame([('2020-03-10', 'France', 19),
                            ('2020-03-11', 'France', 22),
                            ('2020-03-12', 'France', 0),
                            ('2020-03-13', 'France', 0),
                            ('2020-03-14', 'France', 0),
                            ('2020-04-10', 'UK', 12),
                            ('2020-04-11', 'UK', 0),
                            ('2020-04-12', 'UK', 9),
                            ('2020-04-13', 'UK', 0),
                            ('2020-04-08', 'Japan', 0),
                            ('2020-04-09', 'Japan', -3),
                            ('2020-04-10', 'Japan', -2)
                           ],
                           ['date', 'country', 'y']
                           )

我正在寻找一种方法(不循环,因为我的真实 DataFrame 有数百万行)来删除每个时间序列末尾的 0。

在我们的示例中,我们将获得:

df = spark.createDataFrame([('2020-03-10', 'France', 19),
                            ('2020-03-11', 'France', 22),
                            ('2020-04-10', 'UK', 12),
                            ('2020-04-11', 'UK', 0),
                            ('2020-04-12', 'UK', 9),
                            ('2020-04-08', 'Japan', 0),
                            ('2020-04-09', 'Japan', -3),
                            ('2020-04-10', 'Japan', -2)
                           ],
                           ['date', 'country', 'y']
                           )

标签: pythondataframepyspark

解决方案


假设您要在按日期排序的每个国家/地区末尾删除

import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import Window
df = spark.createDataFrame([('2020-03-10', 'France', 19),
                            ('2020-03-11', 'France', 22),
                            ('2020-03-12', 'France', 0),
                            ('2020-03-13', 'France', 0),
                            ('2020-03-14', 'France', 0),
                            ('2020-04-10', 'UK', 12),
                            ('2020-04-11', 'UK', 0),
                            ('2020-04-12', 'UK', 9),
                            ('2020-04-13', 'UK', 0),
                            ('2020-04-13', 'India', 1),
                            ('2020-04-14', 'India', 0),
                            ('2020-04-15', 'India', 0),
                            ('2020-04-16', 'India', 1),
                            ('2020-04-08', 'Japan', 0),
                            ('2020-04-09', 'Japan', -3),
                            ('2020-04-10', 'Japan', -2)
                           ],
                           ['date', 'country', 'y']
                           )
# convert negative to positive to avoid accidental summing up to 0
df=df.withColumn('y1',F.abs(F.col('y')))
# Window function to reverse the last rows to first
w=Window.partitionBy('country').orderBy(F.col('date').desc())
# Start summing function. when the first non zero value comes the value changes
df_sum = df.withColumn("sum_chk",F.sum('y1').over(w))
# Filter non zero values, sort it just for viewing
df_res = df_sum.where("sum_chk!=0").orderBy('date',ascending=True)

结果:

    df_res.show()
+----------+-------+---+---+-------+
|      date|country|  y| y1|sum_chk|
+----------+-------+---+---+-------+
|2020-03-10| France| 19| 19|     41|
|2020-03-11| France| 22| 22|     22|
|2020-04-08|  Japan|  0|  0|      5|
|2020-04-09|  Japan| -3|  3|      5|
|2020-04-10|  Japan| -2|  2|      2|
|2020-04-10|     UK| 12| 12|     21|
|2020-04-11|     UK|  0|  0|      9|
|2020-04-12|     UK|  9|  9|      9|
|2020-04-13|  India|  1|  1|      2|
|2020-04-14|  India|  0|  0|      1|
|2020-04-15|  India|  0|  0|      1|
|2020-04-16|  India|  1|  1|      1|
+----------+-------+---+---+-------+

推荐阅读