首页 > 解决方案 > 在已经运行的子进程上运行一个函数(在一个新线程中)

问题描述

我有一些昂贵的长时间运行的功能,我想在多个内核上运行。多处理很容易做到这一点。但我还需要定期运行一个函数,该函数根据特定进程的状态(全局变量)计算值。我认为这应该可以通过简单地在子进程上产生一个线程来实现。

这是一个简化的例子。请建议我如何打电话procces_query_state()

import multiprocessing
import time

def process_runner(x: int):
    global xx
    xx = x
    while True:
        time.sleep(0.1)
        xx += 1  # actually an expensive calculation

def process_query_state() -> int:
    y = xx * 2 # actually an expenseive calculation
    return y

def main():
    processes = {}
    for x in range(10):
        p = multiprocessing.get_context('spawn').Process(target=process_runner, args=(x,))
        p.start()
        processes[x] = p
    while True:
        time.sleep(1)
        print(processes[3].process_query_state()) # this doesn't actually work

if __name__ == '__main__':
    main()

标签: pythonmultiprocessing

解决方案


我看到两个问题:

  1. Processis not RPC( Remote Procedure Call) 并且您不能process_query_state从主进程执行其他功能。您只能用于queue向其他进程发送一些信息 - 但此进程必须定期检查是否有新消息。

  2. Process只能运行一个函数,因此当它收到消息以运行另一个函数时它将停止一个函数,或者它必须threads在 newprocesses上运行才能同时运行许多函数。

编辑:它可能会带来其他问题 - 如果两个函数同时处理相同的数据,那么一个函数可以在另一个函数使用旧值之前更改值,这可能会产生错误的结果。


我创建了使用队列向其发送消息的示例,process_runner它会定期检查是否有消息并运行process_query_state,并将结果发送回主进程。

主进程等待来自选定进程的结果——它会阻塞代码——但如果你想使用更多进程,那么它必须让它变得更复杂。

import multiprocessing
import time

def process_query_state():
    y = xx * 2 # actually an expenseive calculation
    return y

def process_runner(x: int, queue_in, queue_out):
    global xx
    xx = x

    # reverse direction
    q_in = queue_out
    q_out = queue_in
    
    while True:
        time.sleep(0.1)
        xx += 1  # actually an expensive calculation

        # run other function - it will block main calculations 
        # but this way it will use correct `xx` (other calculations will not change it)
        if not q_in.empty():
            if q_in.get() == 'run':
                result = process_query_state()
                q_out.put(result)
                
def main():
    processes = {}
    for x in range(4):
        ctx = multiprocessing.get_context('spawn')
        q_in  = ctx.Queue()
        q_out = ctx.Queue()
        p = ctx.Process(target=process_runner, args=(x, q_in, q_out))
        p.start()
        processes[x] = (p, q_in, q_out)
        
    while True:
        time.sleep(1)
        q_in  = processes[3][1]
        q_out = processes[3][2]
        q_out.put('run')
        
        # non blocking version
        #if not q_in.empty():
        #    print(q_in.get())
        
        # blocking version
        print(q_in.get())

if __name__ == '__main__':
    main()

推荐阅读