首页 > 解决方案 > Find the max value within a Dataframe based on an interval of a different Dataframe

问题描述

I have two dataframes, one with 15 minute intervals and another with a starttime, an endtime, and a value. I would like to find the max value of the second dataframe that falls within the interval of the first dataframe.

Schema of both DataFrames:

DF1 Schema
 |-- start: timestamp (nullable = false)
 |-- end: timestamp (nullable = false)

 DF2 Schema
 |-- starttime: timestamp (nullable = false)
 |-- endtime: timestamp (nullable = false)
 |-- value: Long(nullable = false)

I have created this solution, though I'm worried about its performance. I'm wondering if there is a better way to accomplish this without looping. I thought of joining but since I need to find the max of df2 within an interval of df1 I'm not sure what I'd join on.

case class maxCaseClass(starttime:ZonedDateTime, endtime:ZonedDateTime, max: Long)
var maxInInterval = Seq.newBuilder[maxCaseClass]
val distinctIntervals = df1.select("start", "end").distinct().collect()
distinctIntervals.foreach(row => {
  val starttime = row.getAs("start").asInstanceOf[Timestamp]
  val endtime = row.getAs("end").asInstanceOf[Timestamp]
  val maxDF = df2.filter(col("endtime") >= lit(starttime).cast(TimestampType) && col("starttime") <= lit(endtime).cast(TimestampType)).agg(max("value").as("max"))
  maxInInterval += maxCaseClass(
                   LocalDateTime.parse(starttime.toString).atZone(ZoneOffset.UTC), 
                   LocalDateTime.parse(endtime.toString).atZone(ZoneOffset.UTC), 
                   maxDF.head().getAs("max").asInstanceOf[Long]
                   )
})

Rather than ending up with a sequence I just want to add a new column to df1 with the maxValue, but I'm not sure how to accomplish this.

标签: scaladataframeapache-sparkapache-spark-sql

解决方案


You can join df1 with df2 on that interval condition then aggregate:

val result = DF1.join(
  DF2,
  (col("end") >= col("endtime")) && (col("starttime") >= col("start"))
).groupBy("start", "end")
.agg(max("value").as("max_value"))

Or using SQL with correlated subquery:

DF1.createOrReplaceTempView("df1")
DF2.createOrReplaceTempView("df2")


val result = spark.sql("""
select  *,
        (select max(value) from df2 where end >= endtime and starttime >= start) as max_value
from  df1
""")

推荐阅读