python - 如何在 python 中管理任务队列并在多台计算机上并行运行这些任务?
问题描述
我正在寻找一个 python 库,它允许:管理任务队列,并行运行任务(在一台或多台计算机上),允许任务可以在队列中生成其他任务,并且与 UNIX 和 Windows 兼容。
我阅读了一些关于 Celery、RQ、SCoOP、任务管理器部分的多处理以及消息代理部分的 redis、rabbitMQ 和 ZMQ 的文档,但我真的不知道什么是最好的选择。
解决方案
考虑 Python多处理库。
这允许许多多处理选项,例如使用工作队列将多个进程作为工作人员池运行。它在一台服务器上运行,但您可以实现一个连接器,在另一台服务器上执行工作(例如,通过 SSH 和远程运行 python 可执行文件)。
否则,我不知道可以跨服务器和跨平台工作的 Python 库。你可能需要一个容器化的应用程序——比如 Kubernetes。
下面是我编写的一些示例代码,它将“任务 ID”添加到代表可运行任务的队列中。然后这些可以由工人池并行执行。
import time
from multiprocessing import Queue, Pool, Process
from Queue import Empty
# For writing to logs when using multiprocessing
import logging
from multiprocessing_logging import install_mp_handler()
class RuntimeHelper:
"""
Wrapper to your "runtime" which can execute runs and is persistant within a worker thread.
"""
def __init__(self):
# Implement your own code here
# Do some initialisation such as creating DB connections etc
# Will be done once per worker when the worker starts
pass
def execute_run(self, run_id):
# Implement your own code here to actually do the Run/Task.
# In this case we just sleep for 30 secs instead of doing any real work
time.sleep(30)
pass
def worker(run_id_queue):
"""
This function will be executed once by a Pool of Processes using multiprocessing.Pool
:param queue: The thread-safe Queue of run_ids to use
:return:
"""
helper = RuntimeHelper()
# Iterate runs until death
logging.info("Starting")
while True:
try:
run_id = run_id_queue.get_nowait()
# A run_id=None is a signal to this process to die
# An empty queue means: dont die, the queue is just empty for now and more work could be added soon
if run_id is not None:
logging.info("run_id={0}".format(run_id))
helper.execute_run(run_id)
else:
logging.info("Kill signal received")
return True
except Empty:
# Wait X seconds before checking for new work
time.sleep(15)
if __name__ == '__main__':
num_processes = 10
check_interval_seconds = 15
max_runtime_seconds = 60*15
# ==========================================
# INITIALISATION
# ==========================================
install_mp_handler() # Must be called before Pool is create
queue = Queue()
pool = Pool(num_processes, worker, (queue,))
# don't forget the coma here ^
# ==========================================
# LOOP
# ==========================================
logging.info('Starting to do work')
# Naive wait-loop implementation
max_iterations = max_runtime_seconds / check_interval_seconds
for i in range(max_iterations):
# Add work
ready_runs = <Your code to get some runs>
for ready_run in ready_runs:
queue.put(ready_run.id)
# Sleep while some of the runs are busy
logging.info('Main thread sleeping {0} of {1}'.format(i, max_iterations))
time.sleep(check_interval_seconds)
# Empty the queue of work and send the kill signal (run_id = None)
logging.info('Finishing up')
while True:
try:
run_id = queue.get_nowait()
except Empty:
break
for i in range(num_processes):
queue.put(None)
logging.info('Waiting for subprocesses')
# Wait for the pool finish what it is busy with
pool.close()
pool.join()
logging.info('Done')
推荐阅读
- postgresql - 无法将排除约束添加到表
- spring - Spring config server setup IllegalStateException: You need to configure a uri for the git repository
- c# - Visual Studio“if”片段最初不会识别变量
- css - 我有一个出现的流氓跨度类,它正在破坏链接
- python - 将函数映射到 TensorFlow 中的部分张量
- node.js - NodeJs DialogFlow:名称“”与模式不匹配
- c# - SQL查询根据单独的查询结果连接多个表
- javascript - 如何获取 Web 浏览器的公钥(用于 HTTPS)
- bitcoin - 在同一网络上运行多个比特币节点
- shopify - 您能否在一次 API 调用中为所有产品设置 Shopify 元字段?