python - pyspark计算当前时间和上次活动时间之间差异的移动平均值
问题描述
我有一些这样的记录。
A B
1 2018-12-25
2 2019-01-15
1 2019-01-20
3 2018-01-01
2 2019-01-01
4 2018-04-09
3 2018-11-08
1 2018-03-20
我想要得到的是这样的东西。第一步,在组内按升序排列。(不需要A订购)
A B
1 2018-03-20
1 2018-12-25
1 2019-01-20
3 2018-01-01
3 2018-11-08
2 2019-01-01
2 2019-01-15
4 2018-04-09
第二步,获取组内连续行之间的时间差。
A B C
1 2018-03-20 NaN
1 2018-12-25 280
1 2019-01-20 26
3 2018-01-01 NaN
3 2018-11-08 311
2 2019-01-01 NaN
2 2019-01-15 14
4 2018-04-09 NaN
第三步,得到窗口大小为 2 的 C 的移动平均值。(由于我只提供了很少的行作为示例,为了方便,选择大小 2 即可)
A B C moving_avg
1 2018-03-20 NaN NaN
1 2018-12-25 280 280
1 2019-01-20 26 153
3 2018-01-01 NaN NaN
3 2018-11-08 311 311
2 2019-01-01 NaN NaN
2 2019-01-15 14 14
4 2018-04-09 NaN NaN
如果 Windows 函数可以处理这种情况,则该解决方案实际上不需要生成 C 列。我列出每个步骤只是为了确保您可以清楚地了解问题所在。
结果集将如下所示
A B moving_avg
1 2018-03-20 NaN
1 2018-12-25 280
1 2019-01-20 153
3 2018-01-01 NaN
3 2018-11-08 311
2 2019-01-01 NaN
2 2019-01-15 14
4 2018-04-09 NaN
注意:这是在 pyspark 上并使用数据框。不在使用 Pandas 的 Python 上。
十分感谢!
解决方案
可能有更聪明的方法来实现这一点,但您也可以使用 RDD :
from operator import add
from numpy import mean
from datetime import datetime
data = [(1, "2018-12-25"), (2, "2019-01-15"), (1, "2019-01-20"), (3, "2018-01-01"),
(2, "2019-01-01"), (4, "2018-04-09"), (3, "2018-11-08"), (1, "2018-03-20")]
data = sc.parallelize(data).mapValues(lambda v: [datetime.strptime(v, "%Y-%m-%d")]).reduceByKey(add)
def computeMvgAvg(values):
sorted_date = sorted(values)
diffs = []
mvg_avg = []
for i in range(1, len(sorted_date)):
diffs.append(int((sorted_date[i] - sorted_date[i-1]).total_seconds()/86400))
mvg_avg.append(int(mean(diffs)))
diffs = [None] + diffs
mvg_avg = [None] + mvg_avg
return zip(sorted_date, diffs, mvg_avg)
sch = StructType([
StructField("A", StringType(), True),
StructField("B", DateType(), True),
StructField("C", IntegerType(), True),
StructField("moving_avg", IntegerType(), True)
])
data.flatMapValues(myMapValues).map(lambda row: [row[0]] + list(row[1])).toDF(schema=sch).show()
+---+----------+----+----------+
| A| B| C|moving_avg|
+---+----------+----+----------+
| 1|2018-03-20|null| null|
| 1|2018-12-25| 280| 280|
| 1|2019-01-20| 26| 153|
| 2|2019-01-01|null| null|
| 2|2019-01-15| 14| 14|
| 3|2018-01-01|null| null|
| 3|2018-11-08| 311| 311|
| 4|2018-04-09|null| null|
+---+----------+----+----------+
推荐阅读
- swift - 如何指定在 Swift 中使用泛型重载函数?
- c++ - 如何避免 nullchar 转换为 0?
- r - 两年值之间的比率
- python-3.x - Pandas Dataframe 显示问题 IBM Watson Notebook
- jquery - 使用jquery在laravel中单击按钮时如何获取foreach中数组的值索引
- spring - How not expire cache if it can't retrieve data
- ruby-on-rails - 通过 imageMagic 和 Carrierwave 获得精确尺寸
- c# - 计算交换次数和比较:选择排序
- python - 使用 pywin32 打开 powerpoint 文件时窗口不活动
- angular - 当用户转到下一页时加载数据