首页 > 解决方案 > pyspark计算当前时间和上次活动时间之间差异的移动平均值

问题描述

我有一些这样的记录。

A    B
1    2018-12-25
2    2019-01-15
1    2019-01-20
3    2018-01-01
2    2019-01-01
4    2018-04-09
3    2018-11-08
1    2018-03-20

我想要得到的是这样的东西。第一步,在组内按升序排列。(不需要A订购)

A    B
1    2018-03-20
1    2018-12-25
1    2019-01-20
3    2018-01-01
3    2018-11-08
2    2019-01-01
2    2019-01-15
4    2018-04-09

第二步,获取组内连续行之间的时间差。

A    B            C
1    2018-03-20   NaN
1    2018-12-25   280
1    2019-01-20   26
3    2018-01-01   NaN
3    2018-11-08   311
2    2019-01-01   NaN
2    2019-01-15   14
4    2018-04-09   NaN

第三步,得到窗口大小为 2 的 C 的移动平均值。(由于我只提供了很少的行作为示例,为了方便,选择大小 2 即可)

A    B            C     moving_avg
1    2018-03-20   NaN   NaN
1    2018-12-25   280   280
1    2019-01-20   26    153
3    2018-01-01   NaN   NaN
3    2018-11-08   311   311
2    2019-01-01   NaN   NaN
2    2019-01-15   14    14
4    2018-04-09   NaN   NaN

如果 Windows 函数可以处理这种情况,则该解决方案实际上不需要生成 C 列。我列出每个步骤只是为了确保您可以清楚地了解问题所在。

结果集将如下所示

A    B            moving_avg
1    2018-03-20   NaN
1    2018-12-25   280
1    2019-01-20   153
3    2018-01-01   NaN
3    2018-11-08   311
2    2019-01-01   NaN
2    2019-01-15   14
4    2018-04-09   NaN

注意:这是在 pyspark 上并使用数据框。不在使用 Pandas 的 Python 上。

十分感谢!

标签: pythonpysparkmoving-average

解决方案


可能有更聪明的方法来实现这一点,但您也可以使用 RDD :

from operator import add
from numpy import mean
from datetime import datetime

data = [(1, "2018-12-25"), (2, "2019-01-15"), (1, "2019-01-20"), (3, "2018-01-01"),
        (2, "2019-01-01"), (4, "2018-04-09"), (3, "2018-11-08"), (1, "2018-03-20")]
data = sc.parallelize(data).mapValues(lambda v: [datetime.strptime(v, "%Y-%m-%d")]).reduceByKey(add)

def computeMvgAvg(values):
sorted_date = sorted(values)
diffs = []
mvg_avg = []
for i in range(1, len(sorted_date)):
    diffs.append(int((sorted_date[i] - sorted_date[i-1]).total_seconds()/86400))
    mvg_avg.append(int(mean(diffs)))
diffs = [None] + diffs
mvg_avg = [None] + mvg_avg
return zip(sorted_date, diffs, mvg_avg)

sch = StructType([
   StructField("A", StringType(), True),
   StructField("B", DateType(), True),
   StructField("C", IntegerType(), True),
   StructField("moving_avg", IntegerType(), True)
])
data.flatMapValues(myMapValues).map(lambda row: [row[0]] + list(row[1])).toDF(schema=sch).show()

+---+----------+----+----------+
|  A|         B|   C|moving_avg|
+---+----------+----+----------+
|  1|2018-03-20|null|      null|
|  1|2018-12-25| 280|       280|
|  1|2019-01-20|  26|       153|
|  2|2019-01-01|null|      null|
|  2|2019-01-15|  14|        14|
|  3|2018-01-01|null|      null|
|  3|2018-11-08| 311|       311|
|  4|2018-04-09|null|      null|
+---+----------+----+----------+

推荐阅读