scala - Find the max value within a Dataframe based on an interval of a different Dataframe
问题描述
I have two dataframes, one with 15 minute intervals and another with a starttime
, an endtime
, and a value. I would like to find the max value
of the second dataframe that falls within the interval of the first dataframe.
Schema of both DataFrames:
DF1 Schema
|-- start: timestamp (nullable = false)
|-- end: timestamp (nullable = false)
DF2 Schema
|-- starttime: timestamp (nullable = false)
|-- endtime: timestamp (nullable = false)
|-- value: Long(nullable = false)
I have created this solution, though I'm worried about its performance. I'm wondering if there is a better way to accomplish this without looping. I thought of joining but since I need to find the max of df2
within an interval of df1
I'm not sure what I'd join on.
case class maxCaseClass(starttime:ZonedDateTime, endtime:ZonedDateTime, max: Long)
var maxInInterval = Seq.newBuilder[maxCaseClass]
val distinctIntervals = df1.select("start", "end").distinct().collect()
distinctIntervals.foreach(row => {
val starttime = row.getAs("start").asInstanceOf[Timestamp]
val endtime = row.getAs("end").asInstanceOf[Timestamp]
val maxDF = df2.filter(col("endtime") >= lit(starttime).cast(TimestampType) && col("starttime") <= lit(endtime).cast(TimestampType)).agg(max("value").as("max"))
maxInInterval += maxCaseClass(
LocalDateTime.parse(starttime.toString).atZone(ZoneOffset.UTC),
LocalDateTime.parse(endtime.toString).atZone(ZoneOffset.UTC),
maxDF.head().getAs("max").asInstanceOf[Long]
)
})
Rather than ending up with a sequence I just want to add a new column to df1
with the maxValue
, but I'm not sure how to accomplish this.
解决方案
You can join df1 with df2 on that interval condition then aggregate:
val result = DF1.join(
DF2,
(col("end") >= col("endtime")) && (col("starttime") >= col("start"))
).groupBy("start", "end")
.agg(max("value").as("max_value"))
Or using SQL with correlated subquery:
DF1.createOrReplaceTempView("df1")
DF2.createOrReplaceTempView("df2")
val result = spark.sql("""
select *,
(select max(value) from df2 where end >= endtime and starttime >= start) as max_value
from df1
""")
推荐阅读
- python - Python Matplotlib:在基本图上查找给定 y 值的相应 x 值
- tsql - sql server中PRINT @@ROWCOUNT和OUTPUT $ACTION的区别
- python - 如何将 Flask 后端应用程序连接到 Flask 前端应用程序?
- python - pytest 带有可选参数的行为很奇怪
- javascript - 在 JavaScript / TypeScript 文件中使用 [hash:base64:5]
- php - 显示具有某些条件的数据库字段
- typescript - Angular8 材质,带有标题和瓷砖的网格。mat-grid-tile 内容未完全调整大小
- node.js - 访问安装在 Linux 服务器上的 NodeJS 服务器
- mysql - 谷歌云 SQL 迁移到第二代导致读/写操作激增
- sql - 查询最近日期