首页 > 解决方案 > 如何为火花任务或地图操作设置超时?(或跳过长时间运行的任务)

问题描述

我正在使用 spark 并行化一百万个任务。例如,训练一百万个单独的模型。

我需要确保尽可能多的成功,但尽量少失败。在 spark 中,如果只有一个模型找不到最佳解决方案,它可能会被挂起并永远运行。在这种情况下,spark 作业永远不会完成,并且终止此作业不会将其他 999,999 个模型保存到 hdfs 。
这个问题真的很伤人。

我四处搜寻,但没有发现任何有用的东西:

核心训练代码,主要使用rdd.map进行训练

df1 = (df.rdd
      .map(lambda r: r.asDict())
      .map(lambda d: transform_data(d))
      .map(lambda d: create_model(d))
      .map(lambda d: model_fit(d))
      .map(lambda d: pickle_model(d))
)

如何为火花任务设置超时?或者有什么好的办法吗?

标签: apache-sparkpysparktimeoutjob-scheduling

解决方案


我不认为这可以是配置级别的控制器。您可能只想将其应用于 spark 任务的一个子集。SparkListener可以帮助解决这个问题,因为您可以在任务、阶段、作业级别挂钩,然后使用sparkContenxt.

 /**
   * Called when a task starts
   */
  def onTaskStart(taskStart: SparkListenerTaskStart): Unit

在上面你可以实现超时逻辑。

可以使用 sparkContext 杀死特定任务,使用def cancelStage(stageId: Int)

您可以从侦听器事件中获取特定的 id


推荐阅读