apache-spark - 如何为火花任务或地图操作设置超时?(或跳过长时间运行的任务)
问题描述
我正在使用 spark 并行化一百万个任务。例如,训练一百万个单独的模型。
我需要确保尽可能多的成功,但尽量少失败。在 spark 中,如果只有一个模型找不到最佳解决方案,它可能会被挂起并永远运行。在这种情况下,spark 作业永远不会完成,并且终止此作业不会将其他 999,999 个模型保存到 hdfs 。
这个问题真的很伤人。
我四处搜寻,但没有发现任何有用的东西:
spark.task.maxFailures
: 没有失败,所以这个不生效。spark.network.timeout
: 没有网络问题。spark.executor.heartbeatInterval
: 没有亲戚。
核心训练代码,主要使用rdd.map进行训练
df1 = (df.rdd
.map(lambda r: r.asDict())
.map(lambda d: transform_data(d))
.map(lambda d: create_model(d))
.map(lambda d: model_fit(d))
.map(lambda d: pickle_model(d))
)
如何为火花任务设置超时?或者有什么好的办法吗?
解决方案
我不认为这可以是配置级别的控制器。您可能只想将其应用于 spark 任务的一个子集。SparkListener
可以帮助解决这个问题,因为您可以在任务、阶段、作业级别挂钩,然后使用sparkContenxt
.
/**
* Called when a task starts
*/
def onTaskStart(taskStart: SparkListenerTaskStart): Unit
在上面你可以实现超时逻辑。
可以使用 sparkContext 杀死特定任务,使用def cancelStage(stageId: Int)
您可以从侦听器事件中获取特定的 id
推荐阅读
- python - 无法更改 numpy 数组中的值
- android - React 本机应用程序发布版本不适用于某些 android 设备
- reactjs - 无法读取未定义的属性“直径”
- azure - 如何使用 Service Fabric 修改 Azure VM 规模集中的磁盘分区
- php - Laravel 的酒店房间列表
- c++ - 赋值或回调函数的 C++ 重载模板
- c# - 有没有办法从 API 的模板中将自定义字段添加到文档中,可以通过 DocuSign 控制台进一步使用?
- java - 如何使用 jmrtd 和 scuba 从智能卡中读取数据
- javascript - 如何使用 JavaScript 循环将 html div 元素输出 100 次?
- types - 如何理解 Idris lang 中 List 和 Vect 的类型声明和定义?