首页 > 解决方案 > 将时间戳舍入到最接近的 30 秒

问题描述

我在 a 中有一个列,DF它包含timestamp格式(yyyy-mm-dd HH:mm:ss)。我需要四舍五入timestamp到最近的 30 秒。

old column                   desired column
2016-02-09 19:31:02          2016-02-09 19:31:00  
2016-02-09 19:31:35          2016-02-09 19:31:30
2016-02-09 19:31:52          2016-02-09 19:32:00
2016-02-09 19:31:28          2016-02-09 19:31:30

可以在 Pyspark 中做到这一点吗?

标签: pythonpysparktimestampunix-timestamp

解决方案


如果您使用的是 spark 版本 1.5+,则可以使用pyspark.sql.functions.second()从时间戳列中获取秒数。

import pyspark.sql.functions as f
df.withColumn("second", f.second("old_timestamp")).show()
#+-------------------+------+
#|      old_timestamp|second|
#+-------------------+------+
#|2016-02-09 19:31:02|     2|
#|2016-02-09 19:31:35|    35|
#|2016-02-09 19:31:52|    52|
#|2016-02-09 19:31:28|    28|
#+-------------------+------+

一旦你有了秒部分,你可以取这个数字,除以 30,四舍五入,然后乘以 30 得到“新”秒。

df.withColumn("second", f.second("old_timestamp"))\
    .withColumn("new_second", f.round(f.col("second")/30)*30)\
    .show()
#+-------------------+------+----------+
#|      old_timestamp|second|new_second|
#+-------------------+------+----------+
#|2016-02-09 19:31:02|     2|       0.0|
#|2016-02-09 19:31:35|    35|      30.0|
#|2016-02-09 19:31:52|    52|      60.0|
#|2016-02-09 19:31:28|    28|      30.0|
#+-------------------+------+----------+

从“新”秒开始,我们可以计算以秒为单位的偏移量,当添加到原始时间戳时,将产生所需的“四舍五入”时间戳。

df.withColumn("second", f.second("old_timestamp"))\
    .withColumn("new_second", f.round(f.col("second")/30)*30)\
    .withColumn("add_seconds", f.col("new_second") - f.col("second"))\
    .show()
#+-------------------+------+----------+-----------+
#|      old_timestamp|second|new_second|add_seconds|
#+-------------------+------+----------+-----------+
#|2016-02-09 19:31:02|     2|       0.0|       -2.0|
#|2016-02-09 19:31:35|    35|      30.0|       -5.0|
#|2016-02-09 19:31:52|    52|      60.0|        8.0|
#|2016-02-09 19:31:28|    28|      30.0|        2.0|
#+-------------------+------+----------+-----------+

如我们所见,该列中的负数意味着必须向下舍入原始时间。正数将增加时间。

为了将此时间添加到原始时间戳,首先使用 .将其转换为 unix 时间戳pyspark.sql.functions.unix_timestamp()。添加后,使用 . 将结果转换回时间戳pyspark.sql.functions.from_unixtime()

把这一切放在一起(浓缩中间步骤):

df.withColumn(
        "add_seconds",
        (f.round(f.second("old_timestamp")/30)*30) - f.second("old_timestamp")
    )\
    .withColumn(
        "new_timestamp",
        f.from_unixtime(f.unix_timestamp("old_timestamp") + f.col("add_seconds"))
    )\
    .drop("add_seconds")\
    .show()
#+-------------------+-------------------+
#|      old_timestamp|      new_timestamp|
#+-------------------+-------------------+
#|2016-02-09 19:31:02|2016-02-09 19:31:00|
#|2016-02-09 19:31:35|2016-02-09 19:31:30|
#|2016-02-09 19:31:52|2016-02-09 19:32:00|
#|2016-02-09 19:31:28|2016-02-09 19:31:30|
#+-------------------+-------------------+

推荐阅读