首页 > 解决方案 > Spark Scala - 时间戳的最小差异

问题描述

我试图找出 24 小时的时间段,特定 24 小时窗口内的最低温度和最高温度是多少,然后找到差异最大的那一天。一旦我有了最大的差异,我就会尝试确定发生这种差异的最短时间范围。

例如,我处理的数据示例如下所示:来自以下代码的当前输出:

+------------+-------------------+---------+-------+-------+
|TemperatureF|               Date|timestamp|MinTemp|MaxTemp|
+------------+-------------------+---------+-------+-------+
|        28.0| 01/01/2000 6:53 AM|946709580|   28.0|   37.4|
|        28.0| 01/01/2000 7:53 AM|946713180|   28.0|   37.4|
|        28.0| 01/01/2000 8:53 AM|946716780|   28.0|   37.4|
|        30.2|01/01/2000 10:24 PM|946765440|   30.2|   37.4|
|        30.9|01/01/2000 10:53 PM|946767180|   30.9|   37.4|
|        37.4| 01/02/2000 4:39 AM|946787940|   28.0|   37.4|
|        36.0| 01/02/2000 4:53 AM|946788780|   28.0|   36.0|
|        36.0| 01/02/2000 5:53 AM|946792380|   28.0|   36.0|
+------------+-------------------+---------+-------+-------+

如果我查看 MinTemp 为 28 且 MaxTemp 为 37.4 的前 3 行。因为我在 24 小时内(2000 年 1 月 1 日上午 6:53 到 2000 年 1 月 2 日上午 6:53)查看这 2 (9.4) 之间的差异。MaxTemp 出现在 2000 年 1 月 2 日凌晨 4 点 39 分。鉴于此,我希望我的答案是:

+-----------------+-------------------+----------+
|Start Time       |            EndTime|difference|
+-----------------+-------------------+----------+
|01/01/2000 8:53AM|  01/02/2000 4:39AM|9.4       |
+-----------------+-------------------+----------+

这是因为,这是达到这种温差的最短时间范围。

我认为我要采取的方法是为开始时间、当时的温度(与该时段的最小值匹配)、结束时间、当时的温度(与该时段的最大值匹配)创建列。然后取一个时间差,得到最短的时间差。

示例输出:

+-----------------+-----------+-----------------+-----------+-------+-------+
StartTime         |Temperature|EndTime.         |Temperature|MinTemp|MaxTemp|
+-----------------+-----------+-----------------+-----------+-------+-------+
01/01/2000 6:53AM | 28.0      |01/02/2000 4:39AM| 37.4      | 28.0  | 37.4  |
01/01/2000 7:53AM | 28.0      |01/02/2000 4:39AM| 37.4      | 28.0  | 37.4  |
01/01/2000 8:53AM | 28.0      |01/02/2000 4:39AM| 37.4      | 28.0  | 37.4  |
+-----------------+-----------+-----------------+-----------+-------+-------+

从这里我将计算最小的时间差和温度差。

对最好的方法有什么想法吗?

当前代码:

val data = osh.select(col("TemperatureF"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).filter(col("TemperatureF") > -9999)

val oshdata = data.withColumn("timestamp",unix_timestamp(to_timestamp(col("Date"),"MM/dd/yyyy hh:mm a")))

import org.apache.spark.sql.expressions._
val myWindow = Window.orderBy("timestamp").rangeBetween(Window.currentRow, 86400)
val myData = oshdata.withColumn("MinTemp", min(col("TemperatureF")).over(myWindow))
  .withColumn("MaxTemp",max(col("TemperatureF")).over(myWindow))


myData.createOrReplaceTempView("oshView")
spark.sqlContext.sql("Select * from oshView where TemperatureF == MinTemp or TemperatureF == MaxTemp").show(25)

标签: scalaapache-spark

解决方案


我从您的帖子中导入了数据并编写了以下 SQL 来帮助解决您的问题(基于我对您问题的理解......);但是,我希望这会有所帮助。

    df = spark_sql.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", ",") 
        .load("/path/to/minimum-difference-in-timestamps.txt")

    df.registerTempTable("TABLE1")
    spark_sql.sql(""" CACHE TABLE View1 AS 
                      SELECT *  FROM TABLE1 ORDER BY TemperatureF DESC LIMIT 1""")

    spark_sql.sql("""  CACHE TABLE View2 AS 
                       SELECT A.Date StartTime,
                              B.Date EndTime, 
                              CAST(B.TemperatureF - A.TemperatureF AS FLOAT) difference ,
                              A.TemperatureF,
                              A.MinTemp,
                              A.MaxTemp,
                              B.timestamp END_TIME_TS
                       FROM (
                                SELECT * FROM TABLE1 WHERE 
                                timestamp < (SELECT timestamp FROM View1) 
                                ORDER BY TemperatureF ASC, timestamp DESC LIMIT 1 
                            ) AS A JOIN 
                              View1 AS B ON A.MaxTemp == B.TemperatureF""")

    spark_sql.sql(""" SELECT StartTime,EndTime,difference FROM View2""").show()
    spark_sql.sql(""" SELECT A.Date StartTime, 
                             A.TemperatureF Temperature,
                             B.EndTime ,
                             B.TemperatureF Temperature_END,
                             B.MinTemp, 
                             B.MaxTemp
                     FROM  TABLE1 AS A JOIN
                           View2 AS B 
                           ON A.MaxTemp =B.MaxTemp 
                              AND 
                              A.MinTemp == B.MinTemp
                              AND 
                              A.timestamp < B.END_TIME_TS
     """).show()


+-----------------+-----------------+----------+
|        StartTime|          EndTime|difference|
+-----------------+-----------------+----------+
|01/01/2000 8:53AM|01/02/2000 4:39AM|       9.4|
+-----------------+-----------------+----------+

+-----------------+-----------+-----------------+---------------+-------+-------+
|        StartTime|Temperature|          EndTime|Temperature_END|MinTemp|MaxTemp|
+-----------------+-----------+-----------------+---------------+-------+-------+
|01/01/2000 6:53AM|       28.0|01/02/2000 4:39AM|           28.0|   28.0|   37.4|
|01/01/2000 7:53AM|       28.0|01/02/2000 4:39AM|           28.0|   28.0|   37.4|
|01/01/2000 8:53AM|       28.0|01/02/2000 4:39AM|           28.0|   28.0|   37.4|
+-----------------+-----------+-----------------+---------------+-------+-------+

时间戳的最小差异.txt:

TemperatureF,Date,timestamp,MinTemp,MaxTemp
28.0,01/01/2000 6:53AM,946709580,28.0,37.4
28.0,01/01/2000 7:53AM,946713180,28.0,37.4
28.0,01/01/2000 8:53AM,946716780,28.0,37.4
30.2,01/01/2000 10:24PM,946765440,30.2,37.4
30.9,01/01/2000 10:53PM,946767180,30.9,37.4
37.4,01/02/2000 4:39AM,946787940,28.0,37.4
36.0,01/02/2000 4:53AM,946788780,28.0,36.0
36.0,01/02/2000 5:53AM,946792380,28.0,36.0

推荐阅读