首页 > 解决方案 > Python3 How to gracefully shutdown a multiprocess application

问题描述

I am trying to fix a python3 application where multiple proceess and threads are created controlled by various queues and pipes. I am trying to make a form of controlled exit when someone tries to break the program with ctrl-c. However no mather what I do it always hangs just at the end.

I've tried to used Keyboard-interrupt exception and signal catch The below code is part of the multi process code.

from multiprocessing import Process, Pipe, JoinableQueue as Queue, Event

class TaskExecutor(Process):
  def __init__(....)
    {inits}

  def signal_handler(self, sig, frame):
    print('TaskExecutor closing')
    self._in_p.close()
    sys.exit(1)

  def run
    signal.signal(signal.SIGINT, self.signal_handler)
    signal.signal(signal.SIGTERM, self.signal_handler)
    while True:
      # Get the Task Groupe name from the Task queue.
      try:
        ExecCmd = self._in_p.recv() # type: TaskExecCmd
      except Exceptions as e:
        self._in_p.close()
        return 
      if ExecCmd.Kill:
        self._log.info('{:30} : Kill Command received'.format(self.name))
        self._in_p.close()
        return
      else 
    {other code executing here}

I'm getting the above print that its closing. but im still getting a lot of different exceptions which i try to catch but it will not.

I'm am looking for some documentation on how to and in which order to shut down multiprocess and its main process.

I know it's very general question however its a very large application so if there are any question or thing i could test i could narrow it down.

Regards

标签: multiprocessingqueuepython-3.7

解决方案


So after investigating this issue further I found that in situation where I had a pipe thread, Queue thread and 4 multiprocesses running. # of these processes could end up hanging when terminating the application with ctrl-c. The Pipe and Queue process where already shut down.

In the multiprocessing documentation there are a warning.

Warning If this method is used when the associated process is using a pipe or queue then the pipe or queue is liable to become corrupted and may become unusable by other process. Similarly, if the process has acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock.

And I think this is what's happening. I also found that even though I have a shutdown mechanism in my multi-process class the threads still running would of cause be considered alive (reading is_alive()) even though I know that the run() method have return IE som internal was hanging.

Now of the solution. My multiprocesses was for a design view not a Deamon because I wanted to control the shot down of them. However I changed them to Deamon so they would always be killed regardless. I first added that anyone kill signal would raise and ProgramKilled exception throughout my entire program.

def signal_handler(signum, frame):
  raise ProgramKilled('Task Executor killed')

I then changed my shut down mechanism in my multi process class to

while True:
  # Get the Task Groupe name from the Task queue.
  try:
    # Reading from pipe
    ExecCmd = self._in_p.recv() # type: TaskExecCmd
  # If fatal error just close it all
  except BrokenPipe:
    break
  # This can occure close the pipe and break the loop
  except EOFError:
    self._in_p.close()
    break
  # Exception for when a kill signal is detected
  # Set the multiprocess as killed (just waiting for the kill command from main)
  except ProgramKilled:
    self._log.info('{:30} : Died'.format(self.name))
    self._KilledStatus = True
    continue
  # kill command from main recieved 
  # Shut down all we can. Ignore exceptions 
  if ExecCmd.Kill:
    self._log.info('{:30} : Kill Command received'.format(self.name))
    try:
      self._in_p.close()
      self._out_p.join()
    except Exception:
      pass
    self._log.info('{:30} : Kill Command executed'.format(self.name))
    break
  else if (not self._KilledStatus):
    {Execute code}

# When out of the loop set killed event
KilledEvent.set()

And in my main thread I have added the following clean up process.

#loop though all my resources
for ThreadInterfaces in ResourceThreadDict.values():
  # test each process in each resource
  for ThreadIf in ThreadInterfaces:
    # Wait for its event to be set
    ThreadIf['KillEvent'].wait()
    # When event have been recevied see if its hanging 
    # We know at this point every thing have been closed and all data have been purged correctly so if its still alive terminate it. 
    if ThreadIf['Thread'].is_alive(): 
      try:
        psutil.Process(ThreadIf['Thread'].pid).terminate()
      except (psutil.NoSuchProcess, AttributeError):
        pass

Af a lot of testing I know its really hard to control a termination of and app with multiple processes because you simply do not know in which order all of your processes receive this signal.

I've tried to in someway to save most of my data when its killed. Some would argue what I need that data for when manually terminating the app. But in this case this app runs a lot of external scripts and other application and any of those can lock the application and then you need to manually kill it but still retain the information for what have already been executed.

So this is my solution to my current problem with my current knowledge. Any input or more in depth knowledge on what happening is welcome. Please note that this app runs both on linux and windows.

Regards


推荐阅读