apache-spark - 如何在 Spark-YARN 上设置每个任务的最大允许执行时间?
问题描述
我有一个长期运行的 PySpark Structured Streaming 作业,它读取一个 Kafka 主题,进行一些处理并将结果写回另一个 Kafka 主题。我们的 Kafka 服务器在另一个集群上运行。
它运行良好,但每隔几个小时就会冻结,即使在 Web UI 中,YARN 应用程序仍然处于“正在运行”状态。检查日志后,似乎是由于 Kafka 源的一些暂时连接问题。事实上,有问题的微批次的所有任务都已正确完成,除了显示:
21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy
Spark 或 YARN 未检测到该故障,并且该任务将永远运行(最多几天)并继续每秒打印 10-20 条此类错误消息。重新启动该过程可以解决问题。
在这种情况下是否有可能强制 Spark 任务(在 YARN 上)失败?然后它将自动重新启动并解决问题。当然,任何其他恢复 Kafka 连接的方法也可以......
我知道可以根据最大可接受的内存消耗杀死 YARN 容器,但在执行时间方面没有看到任何类似的东西。
解决方案
我还没有找到使用 YARN 的解决方案,而是使用 Pyspark 驱动程序中的监视循环的解决方法。如果状态 10 分钟未更新,循环将定期检查状态并导致流式应用程序失败
MAX_DURATION = 10*60 # in seconds
df:DataFrame = define_my_data_stream(params)
writer:DataStreamWriter = write_to_my_kafka(df)
qy = writer.start()
prevBatch = -1
while not spark.streams.awaitAnyTermination(defaultMaxDuration):
lastBatch = qy.lastProgress['batchId']
if lastBatch == prevBatch:
qy.stop()
print("Query stopped")
raise RuntimeError("Query '"+(qy.name or "")+"' ("+qy.id+") has stalled")
else:
prevBatch = lastBatch
引发异常将使 Spark 应用程序失败。然后,此故障可以由 YARN 管理,并使用以下选项重新启动应用程序以 spark-submit:
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=4 \
--conf spark.yarn.executor.failuresValidityInterval=1h \
它确实有效:检测到冻结并从检查点重新启动应用程序。但是只能重启一次,好像我没有指定failuresValidityInterval
参数一样。那是另一个问题,Spark的已知问题...
推荐阅读
- android - 颤振剪辑路径
- javascript - React Native:计时器不断重置
- python-multiprocessing - 多处理 - 如何最大化 CPU 使用率?
- c# - 错误:CS1009:无法识别的转义序列
- kotlin - 如何使用高阶函数而不是简单的 for 将这个 Kotlin 代码翻译成更好的代码
- latex - ***(作业中止,未找到合法 \end)LaTex 中的错误
- python - 使用 Python 登录 Gmail 并检测登录失败
- sql-server - ELO ECM 套件 20
- laravel - 如何用背包forlaravel上的html图标替换动作
- google-analytics - 我可以在没有分析的情况下使用 Google AdSense 吗?