python - 有没有办法在使用python中的线程处理任务时动态更改/添加队列内容
问题描述
我是多线程的新手,但了解到这对我的用例非常有用。我有一个要运行的初始任务队列,下面的程序方法将对我有所帮助。
from Queue import Queue
from threading import Thread
def do_stuff(q):
while True:
print q.get()
q.task_done()
q = Queue(maxsize=0)
num_threads = 10
for i in range(num_threads):
worker = Thread(target=do_stuff, args=(q,))
worker.setDaemon(True)
worker.start()
for x in range(100):
q.put(x)
q.join()
我已经研究了很多关于我们是否可以更改/添加队列中的任务但没有任何信息。我的流程最初有一些任务,一旦这些任务完成,就会运行一些任务(依赖关系——这跨越了近数千个任务)。因此,我想根据先前任务的成功/失败继续将任务添加到队列中,并限制并发线程的数量。
解决方案
更新
有了您的所有评论,现在看来您拥有的是 10 个独立的值集,创建了 10 个依赖链:
Chain 1: [1, 11, 21, 31, ...]
Chain 2: [2, 12, 22, 32, ...]
...
Chain 10: [10, 20, 30, 40, ...]
您可以将每个链中的第一个值作为线程池中的并发任务运行(即 1、2、... 10),如果任务成功完成,那么您可以运行链中的下一个值,否则您将完成该链,因为链中的每个连续值仅在成功完成前一个值时运行。
一旦你想出了表达这些依赖链的方法,这将变得非常简单:
from multiprocessing.pool import ThreadPool as Pool
def process_x_value(x):
"""
Process current x value.
Note that this is invoked by a simple call from run_dependency_chain,
which is already threaded.
This function must not be CPU-intensive or else you will not achieve any
level of concurrency using multithreading.
"""
import time
time.sleep(.1) # simulate some I/O
# return success or failure
return True # success
def run_dependency_chain(x):
"""
Process value x, if sucessful process next x value that was dependent
on successful completion.
Repeat until there is no next x value (end of dependency chain).
"""
while True:
result = process_x_value(x)
if not result: # failure
return
results[x] = True # just store successful results
x = next_x.get(x)
if x is None:
return
# we will be running 10 concurrent dependency chains:
# if task 1 completes successfully, next task to run is 11
# if task 2 completes successfully, next task to run is 12
# ...
# if task 10 completes successfully, next task to run is 20
"""
Thus the successor task can be computed by adding 10 to the current task,
but we will assume in general a more complicated relationship is possible. So we will
use a quasi-linked list of dependencies implemented using a dictionary, next_x,
where next_x[x] gives the successor x to be run on successful completion
of task x.
"""
# at most 2000 successful tasks:
next_x = {x: x + 10 for x in range(1, 1991)}
# to hold results, if you are interested:
results = {}
pool = Pool(10)
pool.map(run_dependency_chain, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(len(results)) # number of succesful results
印刷:
2000
如果process_x_value
有足够的 I/O 限制,多线程应该可以将您的运行时间减少近 10 倍。
推荐阅读
- ios - 错误域 = FIRFirestoreErrorDomain 代码 = 7“缺少权限或权限不足。”
- c++ - 使用虚拟类时CUDA非法内存访问
- python - 如何在python中删除列表的所有字符串上的特定字符?
- next.js - NextJs 自定义服务器
- c++ - 试图将基类函数指针指向派生类函数
- android - Android 背景,四面都有阴影
- firebase - 可以在客户端实现firestore读取而不用担心吗?
- android - 以编程方式在 Android 中修改和显示联系人的最佳方式(Kotlin)?
- python - 无法消除熊猫 SettingWithCopyWarning
- javascript - 在我的 vue js 项目中清理 home.vue 文件中的一些代码后,我一直看到这些错误