postgresql - 如何使用 sparksql 从 postgres 表中检索自上次时间戳以来的所有记录
问题描述
我需要根据时间戳检索新获取的记录。我使用了“max”,它只给出 1 条记录,desc 和 limit 也是如此
当数据加载到表中时我需要动态获取记录
Task Task-B Timestamp
aaa bbb 2020-09-02 16:45:12
aa2 bb2 2020-09-02 17:16:10
aa3 bb3 2020-09-03 10:09:15
aa4 bb4 2002-09-01 09:14:34
任务 aaa 到 aa3 是新的,我只需要检索那个
Task Task-B Timestamp
aaa bbb 2020-09-02 16:45:12
aa2 bb2 2020-09-02 17:16:10
aa3 bb3 2020-09-03 10:09:15
解决方案
任务:显示过去 24 小时的数据。
import org.apache.spark.sql.functions._
import spark.implicits._
case class Task(task: String, taskR: String, timestamp: String)
val sourceDF = Seq(Task("aaa", "bbb", "2020-09-02 16:45:12"),
Task("aa2", "bb2", "2020-09-02 17:16:10"),
Task("aa3", "bb3", "2020-09-03 10:09:15"),
Task("aa4", "bb4", "2002-09-01 09:14:34")
).toDF()
sourceDF.show(false)
// +----+-----+-------------------+
// |task|taskR|timestamp |
// +----+-----+-------------------+
// |aaa |bbb |2020-09-02 16:45:12|
// |aa2 |bb2 |2020-09-02 17:16:10|
// |aa3 |bb3 |2020-09-03 10:09:15|
// |aa4 |bb4 |2002-09-01 09:14:34|
// +----+-----+-------------------+
val resDF = sourceDF
.where(((unix_timestamp(current_timestamp()) - unix_timestamp('timestamp)) / 3600) <= 24)
resDF.show(false)
resDF.printSchema()
// +----+-----+-------------------+
// |task|taskR|timestamp |
// +----+-----+-------------------+
// |aaa |bbb |2020-09-02 16:45:12|
// |aa2 |bb2 |2020-09-02 17:16:10|
// |aa3 |bb3 |2020-09-03 10:09:15|
// +----+-----+-------------------+
//
// root
// |-- task: string (nullable = true)
// |-- taskR: string (nullable = true)
// |-- timestamp: string (nullable = true)
推荐阅读
- sql-server - WITH 子句中的 Getdate() 是否应该声明为变量?
- sonarqube - SonarQube 第一个后台任务比下面的要快
- reactjs - 在 mapStateToProps 中访问组件本地状态
- kotlin - 基于类型属性的 Kotlin 返回列表
- php - Zend 框架接口 'Zend\ModuleManager\Feature\ConfigProviderInterface' 未找到
- html - 在调整大小时控制 Bootstrap 如何对列/行进行排序
- html - 在我的节点 js 中,requre 函数不起作用
- javascript - Office.js:如何从使用 Angular CLI 生成的函数文件中调用函数
- python - 在 boost-python 中将 Python 对象的所有权转移到 C++
- c# - 如何使用 MVVM 模式在 WPF 中动态绑定 UserControl