python - 在已经运行的子进程上运行一个函数(在一个新线程中)
问题描述
我有一些昂贵的长时间运行的功能,我想在多个内核上运行。多处理很容易做到这一点。但我还需要定期运行一个函数,该函数根据特定进程的状态(全局变量)计算值。我认为这应该可以通过简单地在子进程上产生一个线程来实现。
这是一个简化的例子。请建议我如何打电话procces_query_state()
。
import multiprocessing
import time
def process_runner(x: int):
global xx
xx = x
while True:
time.sleep(0.1)
xx += 1 # actually an expensive calculation
def process_query_state() -> int:
y = xx * 2 # actually an expenseive calculation
return y
def main():
processes = {}
for x in range(10):
p = multiprocessing.get_context('spawn').Process(target=process_runner, args=(x,))
p.start()
processes[x] = p
while True:
time.sleep(1)
print(processes[3].process_query_state()) # this doesn't actually work
if __name__ == '__main__':
main()
解决方案
我看到两个问题:
Process
is notRPC
(Remote Procedure Call
) 并且您不能process_query_state
从主进程执行其他功能。您只能用于queue
向其他进程发送一些信息 - 但此进程必须定期检查是否有新消息。Process
只能运行一个函数,因此当它收到消息以运行另一个函数时它将停止一个函数,或者它必须threads
在 newprocesses
上运行才能同时运行许多函数。
编辑:它可能会带来其他问题 - 如果两个函数同时处理相同的数据,那么一个函数可以在另一个函数使用旧值之前更改值,这可能会产生错误的结果。
我创建了使用队列向其发送消息的示例,process_runner
它会定期检查是否有消息并运行process_query_state
,并将结果发送回主进程。
主进程等待来自选定进程的结果——它会阻塞代码——但如果你想使用更多进程,那么它必须让它变得更复杂。
import multiprocessing
import time
def process_query_state():
y = xx * 2 # actually an expenseive calculation
return y
def process_runner(x: int, queue_in, queue_out):
global xx
xx = x
# reverse direction
q_in = queue_out
q_out = queue_in
while True:
time.sleep(0.1)
xx += 1 # actually an expensive calculation
# run other function - it will block main calculations
# but this way it will use correct `xx` (other calculations will not change it)
if not q_in.empty():
if q_in.get() == 'run':
result = process_query_state()
q_out.put(result)
def main():
processes = {}
for x in range(4):
ctx = multiprocessing.get_context('spawn')
q_in = ctx.Queue()
q_out = ctx.Queue()
p = ctx.Process(target=process_runner, args=(x, q_in, q_out))
p.start()
processes[x] = (p, q_in, q_out)
while True:
time.sleep(1)
q_in = processes[3][1]
q_out = processes[3][2]
q_out.put('run')
# non blocking version
#if not q_in.empty():
# print(q_in.get())
# blocking version
print(q_in.get())
if __name__ == '__main__':
main()
推荐阅读
- python - Alexa Skill Development 使用 flask-ask 和 ngrok 错误
- python - Selenium 使用 scrapy-selenium 模块从多个 JavaScript 页面中抓取数据
- c++ - 将 std::u16string 转换为 NSString
- ruby-on-rails - 未定义的方法“alias_method_chain”将 Spree 从 3.2 更新到 3.3
- flutter - Flutter 缓存问题:未为“Book”类型定义运算符“[]”
- elasticsearch - 有没有办法阻止访问 Elasticsearch 内置 API,例如 /_nodes?, /_cluster, /_cat 等?
- angular - 调试谷歌浏览器中不显示角度错误
- angular - 使用 Pact 和 Angular 进行 API 合约测试
- sequelize.js - 使用 Sequelize 迁移的数据库字段的默认值
- python - 如何从 DataFrame 中删除某些列只有零值的行