首页 > 解决方案 > 火花组和减少基于相邻行以及单行

问题描述

我有一个如下所示的数据框,并希望通过组合相邻的 rowa 来减少它们,即 previous.close = current.open

val df = Seq(
  ("Ray","2018-09-01","2018-09-10"),
  ("Ray","2018-09-10","2018-09-15"),
  ("Ray","2018-09-16","2018-09-18"),
  ("Ray","2018-09-21","2018-09-27"),
  ("Ray","2018-09-27","2018-09-30"),
  ("Scott","2018-09-21","2018-09-23"),
  ("Scott","2018-09-24","2018-09-28"),
  ("Scott","2018-09-28","2018-09-30"),
  ("Scott","2018-10-05","2018-10-09"),
  ("Scott","2018-10-11","2018-10-15"),
  ("Scott","2018-10-15","2018-09-20")
)

所需的输出如下:

  (("Ray","2018-09-01","2018-09-15"),
  ("Ray","2018-09-16","2018-09-18"),
  ("Ray","2018-09-21","2018-09-30"),
  ("Scott","2018-09-21","2018-09-23"),
  ("Scott","2018-09-24","2018-09-30"),
  ("Scott","2018-10-05","2018-10-09"),
  ("Scott","2018-10-11","2018-10-20"))

所以,到目前为止,我可以使用下面的 DF() 解决方案来压缩相邻的行。

df.alias("t1").join(df.alias("t2"),$"t1.name" === $"t2.name" and $"t1.close"=== $"t2.open" )
  .select("t1.name","t1.open","t2.close")
  .distinct.show(false) 

|name |open      |close     |
+-----+----------+----------+
|Scott|2018-09-24|2018-09-30|
|Scott|2018-10-11|2018-09-20|
|Ray  |2018-09-01|2018-09-15|
|Ray  |2018-09-21|2018-09-30|
+-----+----------+----------+

我试图通过给出 $"t1.close"=!= $"t2.open" 来使用类似的样式来获得单行,然后将两者结合起来以获得最终结果。但是我得到了不需要的行,我无法正确过滤。如何做到这一点?

这篇文章 与具有复杂条件的 Spark SQL 窗口函数不同,后者将附加日期列计算为新列。

标签: scalaapache-sparkapache-spark-sql

解决方案


这是一种方法:

  1. 如果 current等于 previous ,则创建temp1具有值的新列;否则电流值nullopencloseopen
  2. 创建另一个用非空值temp2回填nulls 的列temp1last
  3. 按 ( name, temp2) 对结果数据集进行分组以生成连续的日期范围

我已经修改了您的示例数据,以涵盖超过 2 行的连续日期范围的情况。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("Ray","2018-09-01","2018-09-10"),
  ("Ray","2018-09-10","2018-09-15"),
  ("Ray","2018-09-16","2018-09-18"),
  ("Ray","2018-09-21","2018-09-27"),
  ("Ray","2018-09-27","2018-09-30"),
  ("Scott","2018-09-21","2018-09-23"),
  ("Scott","2018-09-23","2018-09-28"),  // <-- Revised
  ("Scott","2018-09-28","2018-09-30"),
  ("Scott","2018-10-05","2018-10-09"),
  ("Scott","2018-10-11","2018-10-15"),
  ("Scott","2018-10-15","2018-10-20")
).toDF("name", "open", "close")

val win = Window.partitionBy($"name").orderBy("open", "close")

val df2 = df.
  withColumn("temp1", when(
    row_number.over(win) === 1 || lag($"close", 1).over(win) =!= $"open", $"open")
  ).
  withColumn("temp2", last($"temp1", ignoreNulls=true).over(
    win.rowsBetween(Window.unboundedPreceding, 0)
  ))

df2.show
// +-----+----------+----------+----------+----------+
// | name|      open|     close|     temp1|     temp2|
// +-----+----------+----------+----------+----------+
// |Scott|2018-09-21|2018-09-23|2018-09-21|2018-09-21|
// |Scott|2018-09-23|2018-09-28|      null|2018-09-21|
// |Scott|2018-09-28|2018-09-30|      null|2018-09-21|
// |Scott|2018-10-05|2018-10-09|2018-10-05|2018-10-05|
// |Scott|2018-10-11|2018-10-15|2018-10-11|2018-10-11|
// |Scott|2018-10-15|2018-10-20|      null|2018-10-11|
// |  Ray|2018-09-01|2018-09-10|2018-09-01|2018-09-01|
// |  Ray|2018-09-10|2018-09-15|      null|2018-09-01|
// |  Ray|2018-09-16|2018-09-18|2018-09-16|2018-09-16|
// |  Ray|2018-09-21|2018-09-27|2018-09-21|2018-09-21|
// |  Ray|2018-09-27|2018-09-30|      null|2018-09-21|
// +-----+----------+----------+----------+----------+

以上显示了步骤的结果,并1保留了相应连续日期范围中最早的值。步骤用于获取最新的日期范围:2temp2open3maxclose

df2.
  groupBy($"name", $"temp2".as("open")).agg(max($"close").as("close")).
  show
// +-----+----------+----------+
// |name |open      |close     |
// +-----+----------+----------+
// |Scott|2018-09-21|2018-09-30|
// |Scott|2018-10-05|2018-10-09|
// |Scott|2018-10-11|2018-10-20|
// |Ray  |2018-09-01|2018-09-15|
// |Ray  |2018-09-16|2018-09-18|
// |Ray  |2018-09-21|2018-09-30|
// +-----+----------+----------+

推荐阅读