apache-spark - 超时退出慢速 Spark 映射,但保留目前的结果
问题描述
我正在使用非常昂贵的功能(每行可能数十秒)映射 Spark RDD。
这项工作可能需要太长时间,我需要中止它,以便为我们数据流中的其他工作让路。
但是,到目前为止计算的结果对我仍然有用,所以我不想丢弃它们,特别是因为它们可能已经花费了几个小时来计算。
有没有办法在超时时提前退出转换,但保留到目前为止计算的部分结果?
解决方案
至少有两种方法可以做到这一点,通过从map
相关转换切换:
地图分区
mapPartitions
使我们可以访问每个分区上的迭代器,因此如果超时已过期,我们可以简单地假装其中没有项目:
val data = sc.parallelize(1 to 100)
val timeout = 10000
val start = System.currentTimeMillis
data.repartition(10).mapPartitions { iter =>
if (System.currentTimeMillis - start > timeout) Iterator.empty
else iter.map(x => { Thread.sleep(500); x + 1 })
}.count
根据您的环境,您可能需要在此 spark-shell 示例中调整超时,但它应该会产生不同数量的结果,具体取决于您运行相对于 setting 的转换的时间start
。
请注意,分区数必须明显高于执行程序核心的总数,否则所有分区将立即启动,没有什么可跳过的。因此,对于这个例子,我们在开始之前明确地重新分区数据mapPartitions
。您可能需要也可能不需要这样做,具体取决于数据的大小和预置的核心数量。
平面图
更细粒度的方法是使用flatMap
,它允许我们有条件地处理或跳过每个单独的行,通过一个返回 an 的函数Option
(并且仅None
在超时已过期时返回);
// setup as before
data.flatMap{ x => if (System.currentTimeMillis - start > timeout) None
else Some({Thread.sleep(500); x + 1}) }.count
这种方法不需要分区,但即使在超时之后也会扫描所有剩余的分区(但不对任何行执行昂贵的处理)。
推荐阅读
- ace-editor - Ace2 代码编辑器 php 内联突出显示不再起作用
- python - 创建一堆重复的 keras 模型并绕过部分输入
- json - 如何从字符串行解析 json 对象?
- javascript - 当我们点击空格键时,令牌输入中的问题
- c# - Npgsql.PostgresException (0x80004005): 42883: 运算符不存在: jsonb #> text
- java - 这是避免短路评估的好方法吗?
- python - 从N个点中可以找到多少个三角形,其中有N个点的质心?
- c++ - 如果将两个相同的指针作为输入传递,memcmp 会做什么?
- python-3.x - Python 的字节类型实际用于什么?
- css - 如何修复 mat-paginator 大小