首页 > 解决方案 > 如何在 Spark-YARN 上设置每个任务的最大允许执行时间?

问题描述

我有一个长期运行的 PySpark Structured Streaming 作业,它读取一个 Kafka 主题,进行一些处理并将结果写回另一个 Kafka 主题。我们的 Kafka 服务器在另一个集群上运行。

它运行良好,但每隔几个小时就会冻结,即使在 Web UI 中,YARN 应用程序仍然处于“正在运行”状态。检查日志后,似乎是由于 Kafka 源的一些暂时连接问题。事实上,有问题的微批次的所有任务都已正确完成,除了显示:

21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy

Spark 或 YARN 未检测到该故障,并且该任务将永远运行(最多几天)并继续每秒打印 10-20 条此类错误消息。重新启动该过程可以解决问题。

在这种情况下是否有可能强制 Spark 任务(在 YARN 上)失败?然后它将自动重新启动并解决问题。当然,任何其他恢复 Kafka 连接的方法也可以......

我知道可以根据最大可接受的内存消耗杀死 YARN 容器,但在执行时间方面没有看到任何类似的东西。

标签: apache-sparkapache-kafkahadoop-yarnspark-structured-streaming

解决方案


我还没有找到使用 YARN 的解决方案,而是使用 Pyspark 驱动程序中的监视循环的解决方法。如果状态 10 分钟未更新,循环将定期检查状态并导致流式应用程序失败

MAX_DURATION = 10*60 # in seconds

df:DataFrame = define_my_data_stream(params)
writer:DataStreamWriter = write_to_my_kafka(df)

qy = writer.start()

prevBatch = -1
while not spark.streams.awaitAnyTermination(defaultMaxDuration):
    lastBatch = qy.lastProgress['batchId']
    if lastBatch == prevBatch:
        qy.stop()
        print("Query stopped")
        raise RuntimeError("Query '"+(qy.name or "")+"' ("+qy.id+") has stalled")
    else:
        prevBatch = lastBatch

引发异常将使 Spark 应用程序失败。然后,此故障可以由 YARN 管理,并使用以下选项重新启动应用程序以 spark-submit:

--conf spark.yarn.maxAppAttempts=2 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=4 \
--conf spark.yarn.executor.failuresValidityInterval=1h \

它确实有效:检测到冻结并从检查点重新启动应用程序。但是只能重启一次,好像我没有指定failuresValidityInterval参数一样。那是另一个问题,Spark的已知问题...


推荐阅读