首页 > 解决方案 > 在多个不同的工作人员之间共享排队的工作负载

问题描述

关于

我有一个DataRetriever需要使用 API 凭据实例化的类。我有五组不同的 API 凭据,因此我想实例化DataRetriever. DataRetriever只有一个公共方法retrieve,顾名思义,它会subprocess根据id传递给该方法的参数来检索一些数据。

目前的方法

我正在使用queue示例代码段中看到的 a 。id我用所有需要检索的数据流填充队列。

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

问题

我总是可以使用观察者模式,但我想知道是否有 Python 的方式来做这样的事情?

标签: pythonqueuethreadpool

解决方案


您可以执行以下操作:

def worker(q_request, q_response, api_cred):
    dr = DataRetriever(api_cred)
    while True:
        stream_id = q_request.get() # that's blocking unless q.get(False)
        if stream_id == "stop":
            sys.exit(0)
        dr.retrieve(stream_id) # that can take some time (assume blocking)
        q_response.put(stream_id) # signal job has ended to parent process

api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
    t.start()
    threads.append(t)

for item in source():
    q_request.put(item)
    print("Stream ID %s was successfully retrieved." %q_response.get())

这假设这dr.retrieve(stream_id)是阻塞的,或者您有某种方式知道由dr.retrieve(stream_id)尚未完成的子进程启动,因此您的工作人员将阻塞直到它完成(否则DataRetriever必须更改的实现)。

q.get()默认情况下是阻塞的,因此您的worker进程将与其他进程一起等待对象来接受它。Queue()对象也是先进先出的,因此您可以确保工作将在您的worker进程之间均匀分布。


推荐阅读