scala - 使用 Spark Scala 进行区间合并
问题描述
我有间隔列表,只要有重叠,我想合并这些间隔。
示例:List((1,1),(2,2),(4,4),(5,5))
此处所需的输出是List((1,2),(4,5))
我有一个价值 2.5GB 的数字列表,我想将其转换为 range 。
注意:输入列表中没有重复项
脚步
- 输入:列表[Int]。
- 映射到元组列表: List((a,a),(b,b), ...) 。
- 使用范围合并逻辑减少它。
val l = List(List((1,1)),List((2,2)),List((4,4)),List((5,5)))
val list =sc.parallelize(l)
def merge(r1:(Int,Int),r2:(Int,Int)) :(Int,Int) = {
if(r1._2+1==r2._1) (r1._1,r2._2)
else if(r2._2+1 == r1._1) (r2._1,r1._2)
else null
}
val res = list.reduce((x,y) =>{
x.map(r1 => {
y.map(r2 => {
val m = merge(r1,r2)
m match {
case null => List(r1,r2)
case _ => List(m)
}
}).flatten
}).flatten
})
res: List[(Int, Int)] = List((4,5), (2,2), (1,2))
实际输出res: List[(Int, Int)] = List((4,5), (2,2), (1,2))
如我所料List((4,5),(1,2))
。
编辑:我的解决方案
我尝试了以下代码。似乎输入很小,但我的原始数据花费了太长时间。还有比这更好的解决方案吗?
def overlap(x: (Int,Int),y:(Int,Int)) = {
if(x._2==y._1) (x._1,y._2)
else if(x._1==y._2) (y._1,x._2)
else null
}
def isOverlapping(x: (Int,Int),y:(Int,Int)) = {
x._1 == y._1 || x._1 == y._2 || x._2==y._1 || x._2==y._2
}
val res = list.reduce((x,y) =>{
val z = x.map(r1 => {
y.map(r2 => {
val m = merge(r1,r2)
m match {
case null => List(r1,r2)
case _ =>{
List(m)
}
}
}).flatten
}).flatten
//-------compressing the accumulated list z to merge overlapping tuples
z.foldLeft(List[(Int,Int)]()) { (acc, i) => {
if (!acc.exists(isOverlapping(i, _)))
i +: acc
else
acc.map(x => {
val m = overlap(x,i)
m match {
case null => x
case _ => m
}
})
}}
//---------
})
res: List[(Int, Int)] = List((4,5), (1,2))
解决方案
我最近解决了这个问题。我使用 List[List[Int]] 作为我的收藏。
我的方法是使用排序集合,这样当我们实际尝试减少重叠间隔时,我们利用排序(我们使用开始位置作为键开始排序,但如果两个开始位置相等,我们使用结束位置)并且可以完成 O(nlogn) 复杂度的问题。我专门使用了 sorted Set,以便如果有重复的间隔,在我们进一步减少它们之前将其删除。
对集合进行排序后,我们只需检查相邻对是否重叠。我通过检查 1stPair.end >= 2ndPair.Start 来做到这一点。如果为真,则表示 Pairs 是重叠的,我们可以通过取 (1stPair.start,max(1stPair.end,2ndPair.end)) 将这 2 Pairs 更改为 1 pair。这里不需要检查对之间的开始间隔,因为它是有序的,所以 2ndPair.start 将始终 >= 1stPair.start。这是我们通过使用排序集合获得的节省。
我假设,如果这些对彼此相邻而没有重叠,我仍然认为这是重叠并减少了它。例如([1,2],[2,3] 减少为 [1,3])。整个解决方案的时间复杂度是排序的复杂度。由于我使用的是 SortedSet 自带的内置排序算法,我猜它提供了最快的 sort(O(nlogn)。为了减少间隔,它只有 1 次通过集合,所以复杂度是线性的。比较这些复杂度和“O( n)”不如“O(nlogn)”重要。所以总体复杂度是“O(nlogn)”。这是在 Scala Worksheet 中运行并检查了其他几个输入,它工作正常。
import scala.collection.SortedSet
object order extends Ordering[List[Int]] {
override def compare(a: List[Int], b: List[Int]): Int = {
if (a(0) != b(0)) a(0) - b(0)
else a(1) - b(1)
}
}
val sorted_input = SortedSet(List(6, 9), List(1, 4), List(3, 5), List(8, 12))(order)
def deduce(list: List[List[Int]], pair: List[Int]): List[List[Int]] = {
val finalList = (pair, list) match {
case (pair, head :: tail) if (pair(0) <= head(1)) => List(head(0), if (pair(1) > head(1)) pair(1) else head(1)) :: tail
case (pair, emptyList) => pair :: emptyList
}
finalList
}
sorted_input.foldLeft(List[List[Int]]())(deduce) //> res0: List[List[Int]] = List(List(6, 12), List(1, 5))
推荐阅读
- azure - 需要帮助 SmartHotel 演示管道
- javascript - 使用 Angularjs 的 Facebook 图形 API
- css - 组件超出 flex 容器
- xml - 如何使用 XSLT 2.0 在 XML 文档中查找第一个表
- java - 休眠错误:无法删除或更新父行:外键约束失败
- c - 使用 SSE2 内在存储或提取标量 int 结果的更好方法
- excel - 应用程序打开时 Microsoft Excel VBA 错误运行时 1004 导致无限错误循环阻止对 Excel 的任何使用
- c# - 在 MVC API 中提供授权和未授权的端点(路由?)
- sql - 如何按字母顺序构建连接字符串?
- c# - 为什么“statusQueryGetUri”中的“runtimeStatus”没有在计时器完成后立即设置?