首页 > 解决方案 > 使用 Python 多处理库和非守护进程嵌套进程,我如何检测父进程的停止/死亡?

问题描述

这是我的骨架架构(注意它不包含DataComputation run()方法中持久子线程的产生/杀死):

import multiprocessing as mp
import os
import time
from multiprocessing.connection import Connection
from loguru import logger
import psutil

class DataComputation:
    def __init__(self):
        self.tasking_pipe = mp.Pipe()

    # Return tasking pipe endpoint for main process to use
    def get_tasking_pipe_parent(self) -> Connection:
        pipe_child, pipe_parent = self.tasking_pipe
        return pipe_parent

    # Supporting calculation function for run()
    def do_calculation(data):
        return data(...)

    # Main operation function
    def run(self) -> None:
        # Deconstruct Tasking Pipe
        current_pipe_end, external_pipe_end = self.tasking_pipe
        # Run until kill instruction received
        while True:
            # Receive tasking data; Break if kill instruction received
            tasking_data = external_pipe_end.recv()
            if tasking_data == None: break
            # Do something meaningful with tasking data
            process = mp.Process(target = do_calculation, args=(tasking_data))
            processed_data = process.start()
            # Send processed data to main Algorithm
            current_pipe_end.send(processed_data)

    # NON-GRACEFUL FAILSAFE: Thread that watches for parent death; kills siblings on event
    def watcher_thread(self, siblings_list) -> None:
        while True:
            parent_process = psutil.Process(os.getppid())
            if parent_process.status != psutil.STATUS_RUNNING: # *****THIS DOES NOT WORK******
                break
            time.sleep(1)
        # Invoke kill threads function with 'None' as process_list to trigger built in exception
        self.kill_threads(siblings_list)

    # Kill all threads in supplied process list via None message sent through process queue
    def kill_threads(self, process_list) -> None:
        # Attempt graceful, thread suicide via call to process pipe
        pipe_parent = self.get_tasking_pipe_parent()
        for process in process_list:
            pipe_parent.send(None)

        # Await threads to terminate
        for process in process_list:
            process.join()


class MainAlgorithm:
    def __init__(self):
        # Create Data Computation class instance; store communication pipe for tasking
        self.data_computation_class = DataComputation()
        self.tasking_pipe = self.data_computation_class.get_tasking_pipe_parent()

    def run(self):
        # Create Data Computation class "run" process; Provides "API" calls through tasking pipe
        data_comp_run_process = mp.Process(target=self.data_computation_class.run, args=())
        data_comp_run_process.daemon = False
        process_list = [data_comp_run_process]

        # Spawn watcher thread to kill data_comp_run_process in case of non-graceful MainAlgorithm shutdown
        data_comp_watcher = mp.Process(target=self.data_computation_class.watcher_thread, args=([process_list]))
        data_comp_watcher.daemon = False
        data_comp_watcher.start()

        # Start Data Computation class "run" process
        for process in process_list:
            process.start()

        # Do something meaningful
        return_data = 0
        for int in range(10):
            self.tasking_pipe.send(int)
            return_data += self.tasking_pipe.recv()

        # OH NO SYNTAX ERROR AND MAIN PROCESS HALT
        x = None
        y = x + 1
        # ^^^^^^^ PROGRAM STOPS ^^^^^^^ HOW DO I DETECT THIS IN THE CHILD WATCHER THREAD AND KILL THE CHILDREN THREADS

        # Gracefully kill Data Computation class "run" process and subsequently kill watcher process
        self.tasking_pipe.send(None)
        for process in process_list:
            process.join()
        data_comp_watcher.terminate()
        data_comp_watcher.join()

        return return_data


if __name__ == "__main__":
    alg_instance = MainAlgorithm()
    data = alg_instance.run()

问题是我必须为此目的使用嵌套进程,因此,使用非守护进程。这意味着,子线程不会在优雅/非优雅主进程终止时自动获得。此外,该解决方案需要对 Docker 容器友好。

任何帮助,无论如何,将不胜感激!感谢您的时间!

标签: python-3.xconcurrencymultiprocessing

解决方案


推荐阅读