scala - 火花组和减少基于相邻行以及单行
问题描述
我有一个如下所示的数据框,并希望通过组合相邻的 rowa 来减少它们,即 previous.close = current.open
val df = Seq(
("Ray","2018-09-01","2018-09-10"),
("Ray","2018-09-10","2018-09-15"),
("Ray","2018-09-16","2018-09-18"),
("Ray","2018-09-21","2018-09-27"),
("Ray","2018-09-27","2018-09-30"),
("Scott","2018-09-21","2018-09-23"),
("Scott","2018-09-24","2018-09-28"),
("Scott","2018-09-28","2018-09-30"),
("Scott","2018-10-05","2018-10-09"),
("Scott","2018-10-11","2018-10-15"),
("Scott","2018-10-15","2018-09-20")
)
所需的输出如下:
(("Ray","2018-09-01","2018-09-15"),
("Ray","2018-09-16","2018-09-18"),
("Ray","2018-09-21","2018-09-30"),
("Scott","2018-09-21","2018-09-23"),
("Scott","2018-09-24","2018-09-30"),
("Scott","2018-10-05","2018-10-09"),
("Scott","2018-10-11","2018-10-20"))
所以,到目前为止,我可以使用下面的 DF() 解决方案来压缩相邻的行。
df.alias("t1").join(df.alias("t2"),$"t1.name" === $"t2.name" and $"t1.close"=== $"t2.open" )
.select("t1.name","t1.open","t2.close")
.distinct.show(false)
|name |open |close |
+-----+----------+----------+
|Scott|2018-09-24|2018-09-30|
|Scott|2018-10-11|2018-09-20|
|Ray |2018-09-01|2018-09-15|
|Ray |2018-09-21|2018-09-30|
+-----+----------+----------+
我试图通过给出 $"t1.close"=!= $"t2.open" 来使用类似的样式来获得单行,然后将两者结合起来以获得最终结果。但是我得到了不需要的行,我无法正确过滤。如何做到这一点?
这篇文章 与具有复杂条件的 Spark SQL 窗口函数不同,后者将附加日期列计算为新列。
解决方案
这是一种方法:
- 如果 current等于 previous ,则创建
temp1
具有值的新列;否则电流值null
open
close
open
- 创建另一个用非空值
temp2
回填null
s 的列temp1
last
- 按 (
name
,temp2
) 对结果数据集进行分组以生成连续的日期范围
我已经修改了您的示例数据,以涵盖超过 2 行的连续日期范围的情况。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
("Ray","2018-09-01","2018-09-10"),
("Ray","2018-09-10","2018-09-15"),
("Ray","2018-09-16","2018-09-18"),
("Ray","2018-09-21","2018-09-27"),
("Ray","2018-09-27","2018-09-30"),
("Scott","2018-09-21","2018-09-23"),
("Scott","2018-09-23","2018-09-28"), // <-- Revised
("Scott","2018-09-28","2018-09-30"),
("Scott","2018-10-05","2018-10-09"),
("Scott","2018-10-11","2018-10-15"),
("Scott","2018-10-15","2018-10-20")
).toDF("name", "open", "close")
val win = Window.partitionBy($"name").orderBy("open", "close")
val df2 = df.
withColumn("temp1", when(
row_number.over(win) === 1 || lag($"close", 1).over(win) =!= $"open", $"open")
).
withColumn("temp2", last($"temp1", ignoreNulls=true).over(
win.rowsBetween(Window.unboundedPreceding, 0)
))
df2.show
// +-----+----------+----------+----------+----------+
// | name| open| close| temp1| temp2|
// +-----+----------+----------+----------+----------+
// |Scott|2018-09-21|2018-09-23|2018-09-21|2018-09-21|
// |Scott|2018-09-23|2018-09-28| null|2018-09-21|
// |Scott|2018-09-28|2018-09-30| null|2018-09-21|
// |Scott|2018-10-05|2018-10-09|2018-10-05|2018-10-05|
// |Scott|2018-10-11|2018-10-15|2018-10-11|2018-10-11|
// |Scott|2018-10-15|2018-10-20| null|2018-10-11|
// | Ray|2018-09-01|2018-09-10|2018-09-01|2018-09-01|
// | Ray|2018-09-10|2018-09-15| null|2018-09-01|
// | Ray|2018-09-16|2018-09-18|2018-09-16|2018-09-16|
// | Ray|2018-09-21|2018-09-27|2018-09-21|2018-09-21|
// | Ray|2018-09-27|2018-09-30| null|2018-09-21|
// +-----+----------+----------+----------+----------+
以上显示了步骤的结果,并1
保留了相应连续日期范围中最早的值。步骤用于获取最新的日期范围:2
temp2
open
3
max
close
df2.
groupBy($"name", $"temp2".as("open")).agg(max($"close").as("close")).
show
// +-----+----------+----------+
// |name |open |close |
// +-----+----------+----------+
// |Scott|2018-09-21|2018-09-30|
// |Scott|2018-10-05|2018-10-09|
// |Scott|2018-10-11|2018-10-20|
// |Ray |2018-09-01|2018-09-15|
// |Ray |2018-09-16|2018-09-18|
// |Ray |2018-09-21|2018-09-30|
// +-----+----------+----------+
推荐阅读
- javascript - 反应功能组件不呈现
- xml - 如何使用groovy获取XML属性的值
- android - 防止 Oreo 或 Pie 在后台暂停 LocationListener
- amazon-web-services - 无服务器函数如何比调用服务器的普通端点更快?
- linux - 如何以毫秒为单位格式化时间输出以提高精度?
- apache-kafka - Kafka Connect SFTP 接收器 - 给定时间的文件
- javascript - 访问 .then 函数之外的变量
- algorithm - 解决旅行商问题时,分支定界算法比蛮力算法更快吗?
- javascript - 离子角度重定向到错误的路线
- c++ - 将函数指针传递给 PostMessage win32/MFC c++