首页 > 解决方案 > 如何使用 sparksql 从 postgres 表中检索自上次时间戳以来的所有记录

问题描述

我需要根据时间戳检索新获取的记录。我使用了“max”,它只给出 1 条记录,desc 和 limit 也是如此

当数据加载到表中时我需要动态获取记录

       Task     Task-B    Timestamp
       aaa      bbb       2020-09-02 16:45:12
       aa2      bb2       2020-09-02 17:16:10
       aa3      bb3       2020-09-03 10:09:15
       aa4      bb4       2002-09-01 09:14:34 

任务 aaa 到 aa3 是新的,我只需要检索那个

       Task     Task-B    Timestamp
       aaa      bbb       2020-09-02 16:45:12
       aa2      bb2       2020-09-02 17:16:10
       aa3      bb3       2020-09-03 10:09:15
       

标签: postgresqlapache-spark-sqltimestamp

解决方案


任务:显示过去 24 小时的数据。

import org.apache.spark.sql.functions._
import spark.implicits._
    
case class Task(task: String, taskR: String, timestamp: String)

val sourceDF = Seq(Task("aaa", "bbb", "2020-09-02 16:45:12"),
  Task("aa2", "bb2", "2020-09-02 17:16:10"),
Task("aa3", "bb3", "2020-09-03 10:09:15"),
Task("aa4", "bb4", "2002-09-01 09:14:34")
).toDF()

sourceDF.show(false)
//  +----+-----+-------------------+
//  |task|taskR|timestamp          |
//  +----+-----+-------------------+
//  |aaa |bbb  |2020-09-02 16:45:12|
//  |aa2 |bb2  |2020-09-02 17:16:10|
//  |aa3 |bb3  |2020-09-03 10:09:15|
//  |aa4 |bb4  |2002-09-01 09:14:34|
//  +----+-----+-------------------+


val resDF = sourceDF
  .where(((unix_timestamp(current_timestamp()) - unix_timestamp('timestamp)) / 3600) <= 24)

resDF.show(false)
resDF.printSchema()
//  +----+-----+-------------------+
//  |task|taskR|timestamp          |
//  +----+-----+-------------------+
//  |aaa |bbb  |2020-09-02 16:45:12|
//  |aa2 |bb2  |2020-09-02 17:16:10|
//  |aa3 |bb3  |2020-09-03 10:09:15|
//    +----+-----+-------------------+
//
//  root
//  |-- task: string (nullable = true)
//  |-- taskR: string (nullable = true)
//  |-- timestamp: string (nullable = true)

推荐阅读