首页 > 解决方案 > 如何仅创建三个线程或固定数量的线程并将 X 个列表分配给它们

问题描述

假设有一个方法从某个地方接收数据。在这种方法中,我们应该创建三个线程或固定数量的线程。每一条接收到的数据都将分配给一个线程。

也就是说,如果你会收到10个数据列表(注意数据项的个数不固定,不知道),第一个列表分配给第一个线程,第二个列表分配给第二个线程,第三个列表到第三个线程,第四个列表将重新开始并分配给第一个线程或任何可用线程等等。

所以我们唯一知道的数字是他们将在该方法中运行的线程数。

请注意,三个线程应该同时运行。一旦线程变得可用或完成其任务,它将获取下一个数据项并对其进行处理。

这就是我现在正在做的,但如果我有 30 个数据列表,将创建 30 个线程,这很糟糕。

threads = []
for ip in ip_list:
    for cmd in commands:
        th = threading.Thread(target=test_ssh_conn(ip,cmd), args=(ip,))  # args is a tuple with a single element
        th.start()
        threads.append(th)

for th in threads:
    th.join()

标签: pythonmultithreadingnetworking

解决方案


您可以创建固定数量的线程并使用线程安全的全局工作队列来存储任务。虽然有任务,但工作线程会轮询一个并处理它。一旦工作队列为空,线程可以重新加入 main。

由于 Python 的解释器是单线程的,请考虑使用多处理。API 相同,因此很容易根据需要在两者之间切换。

这是一个使用一些模拟数据和函数存根进行模拟的基本示例:

from multiprocessing import Process, Queue
from queue import Empty
from random import uniform
from time import sleep

def work():
    while 1:
        try:
            test_ssh_conn(*tasks.get(timeout=0.5))
        except Empty:
            break
            
    print("thread exiting")

def test_ssh_conn(ip, cmd):
    print("working on %s %d" % (ip, cmd))
    sleep(uniform(1.0, 2.0)) # pretend to do work
    print("done working on %s %d" % (ip, cmd))

if __name__ == '__main__':  
    thread_count = 3
    threads = []
    tasks = Queue()
    ip_list = ["172.16.0.0", "172.31.255.255", "192.168.0.0"]
    cmds = list(range(5))

    for ip in ip_list:
        for cmd in cmds:
            tasks.put((ip, cmd))
    
    for _ in range(thread_count):
        th = Process(target=work)
        threads.append(th)
        th.start()

    for th in threads:
        th.join()

推荐阅读