python - 一段时间后停止 Spark 会话 - Pyspark
问题描述
我在 spark 中做 ETL,有时需要很多时间。我想在一段时间后优雅地关闭 spark 会话。
我正在 Pyspark 中编写我的代码。
try:
df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output)
exception:
spark.stop()
我想在上面的代码中的某个时间后停止火花。
有没有办法在一段时间后优雅地关闭火花会话?
解决方案
我建议使用官方 python Timer优雅地停止 Spark 会话:
import threading
def timer_elapsed():
print('Timer elapsed')
if not sc._jsc.sc().isStopped():
spark.stop()
# wait for 0.5 sec for Spark job to complete
spark_timer = threading.Timer(0.5, timer_elapsed)
spark_timer.start()
try:
df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output
print('Spark job finished successfully.')
except Exception as e:
spark_timer.cancel() # stop timer, we don't need to wait if error occured
if not sc._jsc.sc().isStopped():
spark.stop()
注意:如果时间已过或捕获到异常,我们会在两种情况下停止会话。sc._jsc.sc().isStopped
在请求停止 Spark 上下文之前,我们检查直接调用 Java API的上下文是否处于活动状态。
推荐阅读
- entity-framework - 如何解决 entitychangetracker?
- c# - 找不到 SqlDataAdapter 类型或命名空间
- firebase - 从firebase(不是整个项目或数据库)中恢复意外删除的数据
- c++ - c ++ WebBrowser块url
- python - 在 Bokeh Google Map Offline 上重新绘制
- php - 多条 SQL 输出
- python-3.x - Create multiple objects from list of strings python
- c++ - 我应该为地图使用哪种数据结构?(C++)
- java - Android开发:ID在变量中时如何访问字符串?
- ios - Object Swift 中的 Array 中的对象