首页 > 解决方案 > 有没有办法在使用python中的线程处理任务时动态更改/添加队列内容

问题描述

我是多线程的新手,但了解到这对我的用例非常有用。我有一个要运行的初始任务队列,下面的程序方法将对我有所帮助。

from Queue import Queue
from threading import Thread

def do_stuff(q):
  while True:
    print q.get()
    q.task_done()

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
  worker = Thread(target=do_stuff, args=(q,))
  worker.setDaemon(True)
  worker.start()

for x in range(100):
  q.put(x)

q.join()

我已经研究了很多关于我们是否可以更改/添加队列中的任务但没有任何信息。我的流程最初有一些任务,一旦这些任务完成,就会运行一些任务(依赖关系——这跨越了近数千个任务)。因此,我想根据先前任务的成功/失败继续将任务添加到队列中,并限制并发线程的数量。

标签: pythonmultithreadingmultiprocessingqueue

解决方案


更新

有了您的所有评论,现在看来您拥有的是 10 个独立的值集,创建了 10 个依赖链:

Chain 1: [1, 11, 21, 31, ...]
Chain 2: [2, 12, 22, 32, ...]
...
Chain 10: [10, 20, 30, 40, ...]

您可以将每个链中的第一个值作为线程池中的并发任务运行(即 1、2、... 10),如果任务成功完成,那么您可以运行链中的下一个值,否则您将完成该链,因为链中的每个连续值仅在成功完成前一个值时运行。

一旦你想出了表达这些依赖链的方法,这将变得非常简单:

from multiprocessing.pool import ThreadPool as Pool

def process_x_value(x):
    """
    Process current x value.
    Note that this is invoked by a simple call from run_dependency_chain,
    which is already threaded.
    This function must not be CPU-intensive or else you will not achieve any
    level of concurrency using multithreading.
    """
    import time
    time.sleep(.1) # simulate some I/O
    # return success or failure
    return True # success

def run_dependency_chain(x):
    """
    Process value x, if sucessful process next x value that was dependent
    on successful completion.
    Repeat until there is no next x value (end of dependency chain).
    """
    while True:
        result = process_x_value(x)
        if not result: # failure
            return
        results[x] = True # just store successful results
        x = next_x.get(x)
        if x is None:
            return


# we will be running 10 concurrent dependency chains:
# if task 1 completes successfully, next task to run is 11
# if task 2 completes successfully, next task to run is 12
# ...
# if task 10 completes successfully, next task to run is 20
"""
Thus the successor task can be computed by adding 10 to the current task,
but we will assume in general a more complicated relationship is possible. So we will
use a quasi-linked list of dependencies implemented using a dictionary, next_x,
where next_x[x] gives the successor x to be run on successful completion
of task x.
"""
# at most 2000 successful tasks:
next_x = {x: x + 10 for x in range(1, 1991)}

# to hold results, if you are interested:
results = {}
pool = Pool(10)
pool.map(run_dependency_chain, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(len(results)) # number of succesful results

印刷:

2000

如果process_x_value有足够的 I/O 限制,多线程应该可以将您的运行时间减少近 10 倍。


推荐阅读