首页 > 解决方案 > Spark 查找连续的时间范围

问题描述

我想在一个非常大的数据集中找到持续的时间戳。这需要使用 Java 在 Spark 中完成(也非常欢迎使用 Scala 中的代码示例)。

每行如下所示:

ID、开始时间、结束时间

例如数据集:

[[1, 10, 15],[1, 15, 20], [2, 10, 13], [1, 22, 33], [2, 13, 16]]

预期结果是相同 id 的所有连续时间框架,每个连续时间框架只有开始和结束时间:

[[1, 10, 20],[1, 22, 33], [2, 10, 16]]

我已经尝试了以下方法,但是由于未维护订单,因此无法解决。因此,我希望有一种更有效的方法来做到这一点

textFile.mapToPair(x -> new Tuple2<>(x[0],new Tuple2<>(x[1], x[2])
    .mapValues(x -> new LinkedList<>(Arrays.asList(x)))
    .reduceByKey((x,y) -> {
         Tuple2<Long, Long> v1 = x.getLast();
         Tuple2<Long, Long> v2 = y.getFirst();
         Tuple2<Long, Long> v3 = v2;
         if(v2._1().equals(v1._2())) {
              v3 = new Tuple2<>(v1._1(), v2._2());
              x.removeLast();
         }
         x.addLast(v3);
         return x;
    })
    .flatMapValues(x -> x);

标签: javaapache-spark

解决方案


我认为这不是 Spark 问题,而是合乎逻辑的问题。您应该考虑使用几个独立功能的选项:

  • 将两个间隔绑定在一起(让我们命名它bindEntries()
  • 将新间隔添加到间隔的间隔累加器中(让它成为insertEntry()

建议,我们有模拟数据mockData

+---+-----+---+
| id|start|end|
+---+-----+---+
|  1|   22| 33|
|  1|   15| 20|
|  1|   10| 15|
|  2|   13| 16|
|  2|   10| 13|
+---+-----+---+

在这些功能的帮助下,我对您的问题的解决方案将是这样的:

val processed = mockData
      .groupByKey(_.id)
      .flatMapGroups { (id: Int, it: Iterator[Entry]) =>
        processEntries(it)
      }

的唯一目标processEntries()是将每个 id 的所有条目折叠到非相交区间的集合中。这是它的签名:

def processEntries(it: Iterator[Entry]): List[Entry] =
    it.foldLeft(Nil: List[Entry])(insertEntry)

此函数用于从分组条目中一一获取元素,并将它们一一推送到累加器中。

insertEntry()处理这种插入的函数:

def insertEntry(acc: List[Entry], e: Entry): List[Entry] = acc match {
  case Nil => e :: Nil
  case a :: as =>
    val combined = bindEntries(a, e)
    combined match {
      case x :: y :: Nil => x :: insertEntry(as, y)
      case x :: Nil => insertEntry(as, x)
      case _ => a :: as
    }
}

bindEntries()函数应该为您处理条目的顺序:

def bindEntries(x: Entry, y: Entry): List[Entry] =
  (x.start > y.end, x.end < y.start) match {
    case (true, _) => y :: x :: Nil
    case (_, true) => x :: y :: Nil
    case _ => x.copy(start = x.start min y.start, end = x.end max y.end) :: Nil
  }

bindEntries()将返回正确排序的一两个条目的列表。这是它背后的想法:

在此处输入图像描述

insertEntry()将在插入过程中为您排序所有条目。

毕竟,生成的数据集如下所示:

+---+-----+---+
| id|start|end|
+---+-----+---+
|  1|   10| 20|
|  1|   22| 33|
|  2|   10| 16|
+---+-----+---+

注意:函数insertEntry()不是尾递归的。有一个很好的起点可以进一步优化。

并且有完整的解决方案:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

object AdHoc {

  Logger.getLogger("org").setLevel(Level.OFF)


  def main(args: Array[String]): Unit = {
    import spark.implicits._

    val processed = mockData
      .groupByKey(_.id)
      .flatMapGroups { (id, it) =>
        processEntries(it)
      }

    mockData.show()
    processed.show()
  }


  def processEntries(it: Iterator[Entry]): List[Entry] =
    it.foldLeft(Nil: List[Entry])(insertEntry)

  def insertEntry(acc: List[Entry], e: Entry): List[Entry] = acc match {
    case Nil => e :: Nil
    case a :: as =>
      val combined = bindEntries(a, e)
      combined match {
        case x :: y :: Nil => x :: insertEntry(as, y)
        case x :: Nil => insertEntry(as, x)
        case _ => a :: as
      }
  }

  def bindEntries(x: Entry, y: Entry): List[Entry] =
    (x.start > y.end, x.end < y.start) match {
      case (true, _) => y :: x :: Nil
      case (_, true) => x :: y :: Nil
      case _ => x.copy(start = x.start min y.start, end = x.end max y.end) :: Nil
    }

  lazy val mockData: Dataset[Entry] = spark.createDataset(Seq(
    Entry(1, 22, 33),
    Entry(1, 15, 20),
    Entry(1, 10, 15),
    Entry(2, 13, 16),
    Entry(2, 10, 13)
  ))

  case class Entry(id: Int, start: Int, end: Int)

  implicit lazy val entryEncoder: Encoder[Entry] = Encoders.product[Entry]

  lazy val spark: SparkSession = SparkSession.builder()
    .master("local")
    .getOrCreate()
}

推荐阅读