python-3.x - 使用 Python 多处理库和非守护进程嵌套进程,我如何检测父进程的停止/死亡?
问题描述
这是我的骨架架构(注意它不包含DataComputation
run()
方法中持久子线程的产生/杀死):
import multiprocessing as mp
import os
import time
from multiprocessing.connection import Connection
from loguru import logger
import psutil
class DataComputation:
def __init__(self):
self.tasking_pipe = mp.Pipe()
# Return tasking pipe endpoint for main process to use
def get_tasking_pipe_parent(self) -> Connection:
pipe_child, pipe_parent = self.tasking_pipe
return pipe_parent
# Supporting calculation function for run()
def do_calculation(data):
return data(...)
# Main operation function
def run(self) -> None:
# Deconstruct Tasking Pipe
current_pipe_end, external_pipe_end = self.tasking_pipe
# Run until kill instruction received
while True:
# Receive tasking data; Break if kill instruction received
tasking_data = external_pipe_end.recv()
if tasking_data == None: break
# Do something meaningful with tasking data
process = mp.Process(target = do_calculation, args=(tasking_data))
processed_data = process.start()
# Send processed data to main Algorithm
current_pipe_end.send(processed_data)
# NON-GRACEFUL FAILSAFE: Thread that watches for parent death; kills siblings on event
def watcher_thread(self, siblings_list) -> None:
while True:
parent_process = psutil.Process(os.getppid())
if parent_process.status != psutil.STATUS_RUNNING: # *****THIS DOES NOT WORK******
break
time.sleep(1)
# Invoke kill threads function with 'None' as process_list to trigger built in exception
self.kill_threads(siblings_list)
# Kill all threads in supplied process list via None message sent through process queue
def kill_threads(self, process_list) -> None:
# Attempt graceful, thread suicide via call to process pipe
pipe_parent = self.get_tasking_pipe_parent()
for process in process_list:
pipe_parent.send(None)
# Await threads to terminate
for process in process_list:
process.join()
class MainAlgorithm:
def __init__(self):
# Create Data Computation class instance; store communication pipe for tasking
self.data_computation_class = DataComputation()
self.tasking_pipe = self.data_computation_class.get_tasking_pipe_parent()
def run(self):
# Create Data Computation class "run" process; Provides "API" calls through tasking pipe
data_comp_run_process = mp.Process(target=self.data_computation_class.run, args=())
data_comp_run_process.daemon = False
process_list = [data_comp_run_process]
# Spawn watcher thread to kill data_comp_run_process in case of non-graceful MainAlgorithm shutdown
data_comp_watcher = mp.Process(target=self.data_computation_class.watcher_thread, args=([process_list]))
data_comp_watcher.daemon = False
data_comp_watcher.start()
# Start Data Computation class "run" process
for process in process_list:
process.start()
# Do something meaningful
return_data = 0
for int in range(10):
self.tasking_pipe.send(int)
return_data += self.tasking_pipe.recv()
# OH NO SYNTAX ERROR AND MAIN PROCESS HALT
x = None
y = x + 1
# ^^^^^^^ PROGRAM STOPS ^^^^^^^ HOW DO I DETECT THIS IN THE CHILD WATCHER THREAD AND KILL THE CHILDREN THREADS
# Gracefully kill Data Computation class "run" process and subsequently kill watcher process
self.tasking_pipe.send(None)
for process in process_list:
process.join()
data_comp_watcher.terminate()
data_comp_watcher.join()
return return_data
if __name__ == "__main__":
alg_instance = MainAlgorithm()
data = alg_instance.run()
问题是我必须为此目的使用嵌套进程,因此,使用非守护进程。这意味着,子线程不会在优雅/非优雅主进程终止时自动获得。此外,该解决方案需要对 Docker 容器友好。
任何帮助,无论如何,将不胜感激!感谢您的时间!
解决方案
推荐阅读
- html - 如何使用 flexbox 旋转第一个元素并使其全高?
- angular - 角库找不到模块'name-lib'
- extract - raster::extract 无法生成预期的 data.frame
- jestjs - Playwright JS:如何在玩笑测试中为失败的测试用例截屏
- data-transfer - 在多个客户端之间将少量数据传输到一个服务器
- c - 前向声明 C-Struct 会产生“不兼容的类型”-警告
- r - 如何实际调用承诺?
- python - pandas 按列分组并填充另一列的空值
- excel - 用编辑模式替换输入模式
- c++ - 在我的 CmakeList 中添加外部库