java - Spark 查找连续的时间范围
问题描述
我想在一个非常大的数据集中找到持续的时间戳。这需要使用 Java 在 Spark 中完成(也非常欢迎使用 Scala 中的代码示例)。
每行如下所示:
ID、开始时间、结束时间
例如数据集:
[[1, 10, 15],[1, 15, 20], [2, 10, 13], [1, 22, 33], [2, 13, 16]]
预期结果是相同 id 的所有连续时间框架,每个连续时间框架只有开始和结束时间:
[[1, 10, 20],[1, 22, 33], [2, 10, 16]]
我已经尝试了以下方法,但是由于未维护订单,因此无法解决。因此,我希望有一种更有效的方法来做到这一点
textFile.mapToPair(x -> new Tuple2<>(x[0],new Tuple2<>(x[1], x[2])
.mapValues(x -> new LinkedList<>(Arrays.asList(x)))
.reduceByKey((x,y) -> {
Tuple2<Long, Long> v1 = x.getLast();
Tuple2<Long, Long> v2 = y.getFirst();
Tuple2<Long, Long> v3 = v2;
if(v2._1().equals(v1._2())) {
v3 = new Tuple2<>(v1._1(), v2._2());
x.removeLast();
}
x.addLast(v3);
return x;
})
.flatMapValues(x -> x);
解决方案
我认为这不是 Spark 问题,而是合乎逻辑的问题。您应该考虑使用几个独立功能的选项:
- 将两个间隔绑定在一起(让我们命名它
bindEntries()
) - 将新间隔添加到间隔的间隔累加器中(让它成为
insertEntry()
)
建议,我们有模拟数据mockData
:
+---+-----+---+
| id|start|end|
+---+-----+---+
| 1| 22| 33|
| 1| 15| 20|
| 1| 10| 15|
| 2| 13| 16|
| 2| 10| 13|
+---+-----+---+
在这些功能的帮助下,我对您的问题的解决方案将是这样的:
val processed = mockData
.groupByKey(_.id)
.flatMapGroups { (id: Int, it: Iterator[Entry]) =>
processEntries(it)
}
的唯一目标processEntries()
是将每个 id 的所有条目折叠到非相交区间的集合中。这是它的签名:
def processEntries(it: Iterator[Entry]): List[Entry] =
it.foldLeft(Nil: List[Entry])(insertEntry)
此函数用于从分组条目中一一获取元素,并将它们一一推送到累加器中。
insertEntry()
处理这种插入的函数:
def insertEntry(acc: List[Entry], e: Entry): List[Entry] = acc match {
case Nil => e :: Nil
case a :: as =>
val combined = bindEntries(a, e)
combined match {
case x :: y :: Nil => x :: insertEntry(as, y)
case x :: Nil => insertEntry(as, x)
case _ => a :: as
}
}
该bindEntries()
函数应该为您处理条目的顺序:
def bindEntries(x: Entry, y: Entry): List[Entry] =
(x.start > y.end, x.end < y.start) match {
case (true, _) => y :: x :: Nil
case (_, true) => x :: y :: Nil
case _ => x.copy(start = x.start min y.start, end = x.end max y.end) :: Nil
}
bindEntries()
将返回正确排序的一两个条目的列表。这是它背后的想法:
insertEntry()
将在插入过程中为您排序所有条目。
毕竟,生成的数据集如下所示:
+---+-----+---+
| id|start|end|
+---+-----+---+
| 1| 10| 20|
| 1| 22| 33|
| 2| 10| 16|
+---+-----+---+
注意:函数insertEntry()
不是尾递归的。有一个很好的起点可以进一步优化。
并且有完整的解决方案:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
object AdHoc {
Logger.getLogger("org").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
import spark.implicits._
val processed = mockData
.groupByKey(_.id)
.flatMapGroups { (id, it) =>
processEntries(it)
}
mockData.show()
processed.show()
}
def processEntries(it: Iterator[Entry]): List[Entry] =
it.foldLeft(Nil: List[Entry])(insertEntry)
def insertEntry(acc: List[Entry], e: Entry): List[Entry] = acc match {
case Nil => e :: Nil
case a :: as =>
val combined = bindEntries(a, e)
combined match {
case x :: y :: Nil => x :: insertEntry(as, y)
case x :: Nil => insertEntry(as, x)
case _ => a :: as
}
}
def bindEntries(x: Entry, y: Entry): List[Entry] =
(x.start > y.end, x.end < y.start) match {
case (true, _) => y :: x :: Nil
case (_, true) => x :: y :: Nil
case _ => x.copy(start = x.start min y.start, end = x.end max y.end) :: Nil
}
lazy val mockData: Dataset[Entry] = spark.createDataset(Seq(
Entry(1, 22, 33),
Entry(1, 15, 20),
Entry(1, 10, 15),
Entry(2, 13, 16),
Entry(2, 10, 13)
))
case class Entry(id: Int, start: Int, end: Int)
implicit lazy val entryEncoder: Encoder[Entry] = Encoders.product[Entry]
lazy val spark: SparkSession = SparkSession.builder()
.master("local")
.getOrCreate()
}
推荐阅读
- javascript - 循环进度条 - 箭头类型
- selenium - selenium 测试自动化,IE11 无法使用富文本元素访问
- apache - http2和http1请求的cpu使用比较
- python - 矩阵矩阵乘法的 CPU 时间
- python - 使用 axhline 时删除了 Matplotlib y 轴刻度
- c++ - 在头文件中声明和定义且仅在其 cpp 文件中使用的变量的多重定义错误
- authentication - 在 OWIN 中共享旧的 FormsAuthentication cookie
- vhdl - 使用 QuestaSim 编译 VHDL 时警告“范围选择方向不能确定聚合索引范围方向”
- node.js - nodejs 将对象导出到特定程序
- powershell - powershell 调用命令不处理 try-cache 块