首页 > 解决方案 > 超时退出慢速 Spark 映射,但保留目前的结果

问题描述

我正在使用非常昂贵的功能(每行可能数十秒)映射 Spark RDD。

这项工作可能需要太长时间,我需要中止它,以便为我们数据流中的其他工作让路。

但是,到目前为止计算的结果对我仍然有用,所以我不想丢弃它们,特别是因为它们可能已经花费了几个小时来计算。

有没有办法在超时时提前退出转换,但保留到目前为止计算的部分结果

标签: apache-spark

解决方案


至少有两种方法可以做到这一点,通过从map相关转换切换:

地图分区

mapPartitions使我们可以访问每个分区上的迭代器,因此如果超时已过期,我们可以简单地假装其中没有项目:

val data = sc.parallelize(1 to 100)

val timeout = 10000

val start = System.currentTimeMillis

data.repartition(10).mapPartitions { iter =>
  if (System.currentTimeMillis - start > timeout) Iterator.empty
  else iter.map(x => { Thread.sleep(500); x + 1 })
}.count

根据您的环境,您可能需要在此 spark-shell 示例中调整超时,但它应该会产生不同数量的结果,具体取决于您运行相对于 setting 的转换的时间start

请注意,分区数必须明显高于执行程序核心的总数,否则所有分区将立即启动,没有什么可跳过的。因此,对于这个例子,我们在开始之前明确地重新分区数据mapPartitions。您可能需要也可能不需要这样做,具体取决于数据的大小和预置的核心数量。

平面图

更细粒度的方法是使用flatMap,它允许我们有条件地处理或跳过每个单独的行,通过一个返回 an 的函数Option(并且仅None在超时已过期时返回);

// setup as before
data.flatMap{ x => if (System.currentTimeMillis - start > timeout) None
                   else Some({Thread.sleep(500); x + 1}) }.count

这种方法不需要分区,但即使在超时之后也会扫描所有剩余的分区(但不对任何行执行昂贵的处理)。


推荐阅读