首页 > 解决方案 > 如何使 Python 驱动程序脚本退出,而流仍然在 Spark 中工作?

问题描述

我的Scala程序开始从Kafka数据库流式传输并退出到命令 shell。流媒体继续工作,它的“应用程序”在Spark页面上可见。

我的Scala程序看起来像这样

val ssc = SparkHelper.getStreamingContext(spark, checkpointDir)
val dataFlowStream = KafkaHelper.getKafkaStream[DataFlow, DataFlowDeserializer](ssc, Topics.dataFlowTopic, config.getString("trackrecordsmetrics.kafka.group-id"), classOf[DataFlowDeserializer])

...

ssc.start()
ssc.awaitTermination()

当我开始它时,它写道:

21/04/08 08:58:54 INFO Utils: Successfully started service 'driverClient' on port 32841.
21/04/08 08:58:54 INFO TransportClientFactory: Successfully created connection to sparkmaster/IPADDRESS:7077 after 22 ms (0 ms spent in bootstraps)
21/04/08 08:58:54 INFO ClientEndpoint: ... waiting before polling master for driver state
21/04/08 08:58:54 INFO ClientEndpoint: Driver successfully submitted as driver-20210408085854-0016
21/04/08 08:58:59 INFO ClientEndpoint: State of driver-20210408085854-0016 is RUNNING
21/04/08 08:58:59 INFO ClientEndpoint: Driver running on IPADDRESS:45233 (worker-20210405082522-IPADDRESS-45233)
21/04/08 08:58:59 INFO ClientEndpoint: spark-submit not configured to wait for completion, exiting spark-submit JVM.
21/04/08 08:58:59 INFO ShutdownHookManager: Shutdown hook called
21/04/08 08:58:59 INFO ShutdownHookManager: Deleting directory /tmp/spark-105e3428-ba75-47ca-9d3b-20aa48a45898

然后它退出。我仍然可以在 Spark 页面上看到它的应用程序并查看它的工作效果。即驱动程序退出,而应用程序继续工作。

相反,

当我运行类似的 Python 脚本时,它仅在脚本运行时才有效。我的脚本的结尾是相似的

query = df \
    .writeStream \
    .outputMode('update') \
    .foreachBatch(write_data_frame) \
    .start()

query.awaitTermination()

如果我按下Ctrl-C脚本结束并且流媒体停止。如果我删除 line query.awaitTermination(),则永远不会开始流式传输。

是否可以提交 python scrpt 以便它仍然在那里运行?

我正在提交 Python 脚本

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1,org.mariadb.jdbc:mariadb-java-client:2.7.2 MYSCRIPT.py

我无法指定cluster模式,因为Python.

标签: pythonscalaapache-spark

解决方案


推荐阅读