首页 > 解决方案 > 使用 Spark Scala 进行区间合并

问题描述

我有间隔列表,只要有重叠,我想合并这些间隔。

示例:List((1,1),(2,2),(4,4),(5,5)) 此处所需的输出是List((1,2),(4,5))

我有一个价值 2.5GB 的数字列表,我想将其转换为 range 。

注意:输入列表中没有重复项

脚步

  1. 输入:列表[Int]。
  2. 映射到元组列表: List((a,a),(b,b), ...) 。
  3. 使用范围合并逻辑减少它。
val l = List(List((1,1)),List((2,2)),List((4,4)),List((5,5)))
val list =sc.parallelize(l)

def merge(r1:(Int,Int),r2:(Int,Int))  :(Int,Int)  = {
    if(r1._2+1==r2._1) (r1._1,r2._2)
    else if(r2._2+1 == r1._1) (r2._1,r1._2)
    else null
}

val res = list.reduce((x,y) =>{
   x.map(r1 => {
        y.map(r2 => {
            val m = merge(r1,r2)
             m match {
                case null => List(r1,r2)
                case _ => List(m)
             }
         }).flatten
    }).flatten
})

res: List[(Int, Int)] = List((4,5), (2,2), (1,2))

实际输出res: List[(Int, Int)] = List((4,5), (2,2), (1,2)) 如我所料List((4,5),(1,2))

编辑:我的解决方案

我尝试了以下代码。似乎输入很小,但我的原始数据花费了太长时间。还有比这更好的解决方案吗?

def overlap(x: (Int,Int),y:(Int,Int)) = {
    if(x._2==y._1) (x._1,y._2)
    else if(x._1==y._2) (y._1,x._2)
    else null
}

def isOverlapping(x: (Int,Int),y:(Int,Int)) = {
    x._1 == y._1 || x._1 == y._2 || x._2==y._1 || x._2==y._2 
}

val res = list.reduce((x,y) =>{
  val z =  x.map(r1 => {
        y.map(r2 => {
            val m = merge(r1,r2)
             m match {
                case null => List(r1,r2)
                case _ =>{
                     List(m)
                }
             }
         }).flatten
    }).flatten
    
//-------compressing the accumulated list z to merge overlapping tuples

    z.foldLeft(List[(Int,Int)]()) { (acc, i) => {
    if (!acc.exists(isOverlapping(i, _)))
        i +: acc
      else
        acc.map(x => {
            val m = overlap(x,i)
             m match {
                case null => x
                case _ => m
             }
        })
    }}
//---------

})


res: List[(Int, Int)] = List((4,5), (1,2))

标签: scalaapache-sparkreduce

解决方案


我最近解决了这个问题。我使用 List[List[Int]] 作为我的收藏。

我的方法是使用排序集合,这样当我们实际尝试减少重叠间隔时,我们利用排序(我们使用开始位置作为键开始排序,但如果两个开始位置相等,我们使用结束位置)并且可以完成 O(nlogn) 复杂度的问题。我专门使用了 sorted Set,以便如果有重复的间隔,在我们进一步减少它们之前将其删除。

对集合进行排序后,我们只需检查相邻对是否重叠。我通过检查 1stPair.end >= 2ndPair.Start 来做到这一点。如果为真,则表示 Pairs 是重叠的,我们可以通过取 (1stPair.start,max(1stPair.end,2ndPair.end)) 将这 2 Pairs 更改为 1 pair。这里不需要检查对之间的开始间隔,因为它是有序的,所以 2ndPair.start 将始终 >= 1stPair.start。这是我们通过使用排序集合获得的节省。

我假设,如果这些对彼此相邻而没有重叠,我仍然认为这是重叠并减少了它。例如([1,2],[2,3] 减少为 [1,3])。整个解决方案的时间复杂度是排序的复杂度。由于我使用的是 SortedSet 自带的内置排序算法,我猜它提供了最快的 sort(O(nlogn)。为了减少间隔,它只有 1 次通过集合,所以复杂度是线性的。比较这些复杂度和“O( n)”不如“O(nlogn)”重要。所以总体复杂度是“O(nlogn)”。这是在 Scala Worksheet 中运行并检查了其他几个输入,它工作正常。

 import scala.collection.SortedSet
  object order extends Ordering[List[Int]] {
    override def compare(a: List[Int], b: List[Int]): Int = {
      if (a(0) != b(0)) a(0) - b(0)
      else a(1) - b(1)
    }
  }
  val sorted_input = SortedSet(List(6, 9), List(1, 4), List(3, 5), List(8, 12))(order)
  def deduce(list: List[List[Int]], pair: List[Int]): List[List[Int]] = {
    val finalList = (pair, list) match {
      case (pair, head :: tail) if (pair(0) <= head(1)) => List(head(0), if (pair(1) > head(1)) pair(1) else head(1)) :: tail
      case (pair, emptyList)                            => pair :: emptyList
    }
    finalList
  }                                              
  sorted_input.foldLeft(List[List[Int]]())(deduce) //> res0: List[List[Int]] = List(List(6, 12), List(1, 5))

推荐阅读