python - Pyspark 5分钟滑动窗口求和
问题描述
我有这样的数据:
('2017-02-03', '22:57:00')
('2017-02-03', '23:02:00')
('2017-02-04', '09:56:00')
('2017-02-04', '10:01:00')
('2017-02-04', '10:06:00')
('2017-02-04', '10:11:00')
('2017-02-04', '10:16:00')
('2017-02-04', '10:21:00')
('2017-02-04', '10:26:00')
('2017-02-04', '10:31:00')
('2017-02-04', '10:36:00')
('2017-02-04', '16:57:00')
('2017-02-04', '17:12:00')
我想要做的是比较每个日期的时间,看看是否有 5 分钟的差异。如果有五分钟的差异,我会数一数他们连续有多少。这将产生如下结果:
('2017-02-03', '22:57:00') <- 1
('2017-02-03', '23:02:00') <- 2
('2017-02-04', '09:56:00') <- 1
('2017-02-04', '10:01:00') <- 2
('2017-02-04', '10:06:00') <- 3
('2017-02-04', '10:11:00') <- 4
('2017-02-04', '10:16:00') <- 5
('2017-02-04', '10:21:00') <- 6
('2017-02-04', '10:26:00') <- 7
('2017-02-04', '10:31:00') <- 8
('2017-02-04', '10:36:00') <- 9
('2017-02-04', '16:57:00') <- 1
('2017-02-04', '17:12:00') <- 1
最终:
('2017-02-03', ('22:57:00', 2))
('2017-02-04', ('09:56:00', 9))
('2017-02-04', ('16:57:00', 1))
('2017-02-04', ('17:12:00', 1))
到目前为止,这是我的代码
def check_interval(values, measurement):
start_date = ""
start_time = ""
counter = 1
res = ""
for index, val in enumerate(values):
if index + 1 == len(values):
break
date1, time1 = get_date_time(val)
date2, time2 = get_date_time(values[index + 1])
start_date = date1
if counter == 1:
start_time = time1
date_time1 = ' '.join(val)
date_time2 = ' '.join(values[index + 1])
time_diff = subtract_time(date_time1, date_time2)
if time_diff > timedelta(minutes=measurement):
res = start_date + "\t(" + start_time + ", " + str(counter) + ")\n"
print(res)
counter = 1
else:
counter += 1
if date1 != date2:
start_date = date2
# ------------------------------------------
# FUNCTION my_main
# ------------------------------------------
def my_main(sc, my_dataset_dir, station_name, measurement_time):
inputRDD = sc.textFile(my_dataset_dir)
stationRDD = inputRDD \
.map(process_line) \
.filter(lambda line: (line[0] == '0' and line[1] == station_name and line[5] == '0')) \
.map(lambda date_time: date_time[4]) \
.map(split_date_time) \
.sortByKey() \
.collect()
check_interval(stationRDD, measurement_time)
我有我想要的结果,但我想知道是否可以使用 pyspark 函数来实现这一点?并产生输出:
('2017-02-03', ('22:57:00', 2))
('2017-02-04', ('09:56:00', 9))
('2017-02-04', ('16:57:00', 1))
('2017-02-04', ('17:12:00', 1))
解决方案
您可以将数据框 API 与window
函数一起使用:
import pyspark.sql.functions as psf
from pyspark.sql import Window
w = Window.orderBy('datetime')
df \
.withColumn('datetime', psf.unix_timestamp(psf.concat('date', psf.lit(' '), 'time').cast('timestamp'))) \
.withColumn('5min_delta', (psf.col('datetime') - psf.lag('datetime').over(w)) / 60 > 5) \
.fillna(True) \
.withColumn('group_id', psf.sum(psf.col('5min_delta').cast('int')).over(w)).show()
+----------+--------+----------+----------+--------+
| date| time| datetime|5min_delta|group_id|
+----------+--------+----------+----------+--------+
|2017-02-03|22:57:00|1486159020| true| 1|
|2017-02-03|23:02:00|1486159320| false| 1|
|2017-02-04|09:56:00|1486198560| true| 2|
|2017-02-04|10:01:00|1486198860| false| 2|
|2017-02-04|10:06:00|1486199160| false| 2|
|2017-02-04|10:11:00|1486199460| false| 2|
|2017-02-04|10:16:00|1486199760| false| 2|
|2017-02-04|10:21:00|1486200060| false| 2|
|2017-02-04|10:26:00|1486200360| false| 2|
|2017-02-04|10:31:00|1486200660| false| 2|
|2017-02-04|10:36:00|1486200960| false| 2|
|2017-02-04|16:57:00|1486223820| true| 3|
|2017-02-04|17:12:00|1486224720| true| 4|
+----------+--------+----------+----------+--------+
- 第一个窗口函数是计算 2 个连续时间戳之间的时间增量(以分钟为单位)。
- 第二个,允许我们通过计算累积和来创建唯一的组标识符。每当有大于 5 分钟的间隙时,它就会增加 1。
然后,您可以计算每个组中的元素数
df \
.groupBy('group_id') \
.agg(psf.first('date').alias('date'), psf.count('*').alias('nb')) \
.show()
+--------+----------+---+
|group_id| date| nb|
+--------+----------+---+
| 1|2017-02-03| 2|
| 2|2017-02-04| 9|
| 3|2017-02-04| 1|
| 4|2017-02-04| 1|
+--------+----------+---+
推荐阅读
- excel - 为什么我的 ExportAsFixedFormat 不起作用?
- java - NumberFormatException 春天
- sql - 尝试使用 psql 在 Postgres 中获取函数的来源时,错误“列 p.proisagg 不存在”是什么意思?
- android - 媒体会话在 AOD 上“无标题”(始终显示)
- php - 将一个大字符串放入一个变量中,其中包含其他变量
- c++ - 将任意模板化向量作为函数参数传递 C++
- c++ - 如何使用 InverseKinematics 解决 MultibodyPlant 上的 IK?
- android - 我可以使用 setContentView 或 Fragments 吗?
- markdown - 将 HTML 属性添加到 Markdown 的多行部分
- cookies - 如何解决请求访问 cookie 或存储被阻止,因为它来自跟踪器并且启用了内容阻止