python - 使用 Windows 分组 5 分钟时间范围
问题描述
csv文件是:
#+----+-----------+-------------------+
#|col1| col2| timestamp|
#+----+-----------+-------------------+
#| 0|Town Street|01-02-2017 06:01:00|
#| 0|Town Street|01-02-2017 06:03:00|
#| 0|Town Street|01-02-2017 06:05:00|
#| 0|Town Street|01-02-2017 06:06:00|
#| 0|Town Street|02-02-2017 10:01:00|
#| 0|Town Street|02-02-2017 10:05:00|
#+----+-----------+-------------------+
比较每个日期的时间,看看是否有 5 分钟的差异,如果他们是数他们
输出:
#+----+-----------+-------------------+
#|col1| col2| timestamp|
#+----+-----------+-------------------+
#| 0|Town Street|01-02-2017 06:01:00|
#| 0|Town Street|01-02-2017 06:03:00|
#| 0|Town Street|01-02-2017 06:05:00|
#| 0|Town Street|01-02-2017 06:06:00|
#| 0|Town Street|02-02-2017 10:01:00|
#| 0|Town Street|02-02-2017 10:05:00|
#+----+-----------+-------------------+
现在的代码:
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
def my_main(sc, my_dataset_dir):
sqlContext = SQLContext(sc)
df = sqlContext.read.csv(my_dataset_dir,sep=';').rdd.zipWithIndex().filter(lambda x: x[1] > 1).map(lambda x: x[0]).toDF(['status','title','datetime'])
此代码仅给出 5 分钟窗口的空结果。
解决方案
不确定这是否正是您想要的,但它应该将您推向正确的方向。您可以将时间戳转换为timestamptype
and datetype
。在. window
_ partitionBy
_ rangebetween
_seconds(300)
#df.show()
#sampledataframe
#+----+-----------+-------------------+
#|col1| col2| timestamp|
#+----+-----------+-------------------+
#| 0|Town Street|01-02-2017 06:01:00|
#| 0|Town Street|01-02-2017 06:03:00|
#| 0|Town Street|01-02-2017 06:05:00|
#| 0|Town Street|01-02-2017 06:06:00|
#| 0|Town Street|02-02-2017 10:01:00|
#| 0|Town Street|02-02-2017 10:05:00|
#+----+-----------+-------------------+
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("date").orderBy(F.col("timestamp").cast("long")).rangeBetween(Window.currentRow,60*5)
df.withColumn("timestamp", F.to_timestamp("timestamp",'MM-dd-yyyy HH:mm:ss'))\
.withColumn("date", F.to_date("timestamp"))\
.withColumn('collect', F.size(F.collect_list("timestamp").over(w))).filter("collect>1")\
.select(F.date_format("date","yyyy-MM-dd").alias("date"), F.array(F.date_format("timestamp","HH:mm:ss"),F.col("collect")).alias("time"))\
.orderBy("date").show()
#+----------+-------------+
#| date| time|
#+----------+-------------+
#|2017-01-02|[06:01:00, 4]|
#|2017-01-02|[06:05:00, 2]|
#|2017-01-02|[06:03:00, 3]|
#|2017-02-02|[10:01:00, 2]|
#+----------+-------------+
推荐阅读
- c++ - 不使用命名空间 std 就无法编译的程序;
- python - Python Pywinauto 是否可以关闭应用程序连接的实例?我没有看到任何属性关闭或与应用程序断开连接
- javascript - React Javascript 值显示未定义
- vb.net - 如何使用 System.Drawing.Imaging 保存图像而不自动旋转它?
- linux - 如何检查文件名中包含特定字符串的文件是否存在于bash中?
- virtual-machine - QEMU 中的内核参数顺序是否重要?
- wordpress - 赛普拉斯问题取值并进行比较。范围变量
- python - 删除数据框中的顽固 \r 并创建 CSV
- javascript - 如何使用 Styled-Component 和 Material-UI 对组件进行主题化
- php - 如何从数组中提取子数据