python - 在多个不同的工作人员之间共享排队的工作负载
问题描述
关于
我有一个DataRetriever
需要使用 API 凭据实例化的类。我有五组不同的 API 凭据,因此我想实例化DataRetriever
. DataRetriever
只有一个公共方法retrieve
,顾名思义,它会subprocess
根据id
传递给该方法的参数来检索一些数据。
- 给定的 API 凭据不能同时打开多个流(具有任何 ID)
- a
DataRetriever
最多只能有一个与 API 的连接,因此DataRetriever#retrieve(id)
不得在DataRetriever
仍在检索数据流的实例上调用 - 数据量会有所不同,因此子进程退出之前的时间可以是几秒到几分钟之间的任何时间
目前的方法
我正在使用queue
示例代码段中看到的 a 。id
我用所有需要检索的数据流填充队列。
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
问题
我总是可以使用观察者模式,但我想知道是否有 Python 的方式来做这样的事情?
- 如何
worker
从上面的代码片段中确保将排队的工作负载仅分配给 idling ,同时无缝DataRetriever
使用所有五个实例?DataRetriever
- 在研究时,我发现
ProcessPoolExecutor
无法使示例适应我的场景。这可能是解决方案吗?
解决方案
您可以执行以下操作:
def worker(q_request, q_response, api_cred):
dr = DataRetriever(api_cred)
while True:
stream_id = q_request.get() # that's blocking unless q.get(False)
if stream_id == "stop":
sys.exit(0)
dr.retrieve(stream_id) # that can take some time (assume blocking)
q_response.put(stream_id) # signal job has ended to parent process
api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
t.start()
threads.append(t)
for item in source():
q_request.put(item)
print("Stream ID %s was successfully retrieved." %q_response.get())
这假设这dr.retrieve(stream_id)
是阻塞的,或者您有某种方式知道由dr.retrieve(stream_id)
尚未完成的子进程启动,因此您的工作人员将阻塞直到它完成(否则DataRetriever
必须更改的实现)。
q.get()
默认情况下是阻塞的,因此您的worker
进程将与其他进程一起等待对象来接受它。Queue()
对象也是先进先出的,因此您可以确保工作将在您的worker
进程之间均匀分布。
推荐阅读
- excel - 使用 Excel VBA 删除或覆盖 SharePoint 列表?
- javascript - 如何修复 404 错误 Node JS Express POST 请求
- c++ - 用于调用一系列具有相似名称(即 f_0、f_1、f_2、...)的宏生成函数的宏
- ios - 在 iOS 图表的 x 轴上重复值
- javascript - 如何创建将用户输入发送到另一个组件的 vue 模式
- python - 即使我尝试卸载并重新安装,也会出现 (ModuleNotFoundError: No module named 'requests') 错误
- javascript - 磁悬浮与光标交互
- javascript - 从 onclick 函数中获取按钮的 ID
- javascript - 使用 React Router 隐藏路由的特定组件
- kotlin - 计算 ArrayList (Kotlin) 中的连续重复值