首页 > 解决方案 > 如何使用 threadpool.executor 在线程之间共享计数器变量并增加它?

问题描述

以下是我在 python 3.x 中实现的线程池执行器

  with ThreadPoolExecutor(max_workers=15) as ex:
        f = open(filename, 'r', encoding='UTF-8')
        results = {ex.submit(callreadline, files ): files for files in f.readlines() }

结果变量包含以下格式的值

单词及其对应的 200 维嵌入

您可以看到这些值是元组。第一个值是一个单词,第二个值是 200 维数组。值的数量总共为 400000。所以有 400000 个元组。

现在我要做的是创建另一个执行以下任务的线程池执行器

  1. 创建元组列表中第一个值的有序字典。这意味着前 4 个元组值的 say 单词是the, is, are, said。然后有序字典将包含:

{the:0,is:1,are:2,said:3,…………………….hello:399999}

  1. 创建一个 numpy nd 数组,其中包含有序字典中相应单词的 200 维数组(通过相应的单词,我的意思是第一个条目将是单词the的 200 维数组,然后200 维数组......并且列表去上)。所以 numpy nd 数组的维度为 400000 * 200。

我正在使用带有以下代码的for循环

    count = 0
    word_to_idx = OrderedDict()
    vectors = []
    for future in results.result:
            b = future.result()
            word_to_idx[count] = b[0]
            if(count == 0):
                vectors =  np.array([b[1]])
            else:    
                vectors = np.append(vectors,np.array([b[1]]),axis=0)
            count = count +1

在上述函数结束时,我返回了完成这项工作的 word_to_idx 和向量。然而,循环 400000 个元组并一个一个地分配给变量需要非常长的时间(大约 10 小时)。

所以我在想是否有一种方法可以并行化这个功能以及使用线程池执行器。

我正在考虑创建线程,然后与每个线程一次访问一个共享变量共享一个计数器变量。然后线程将增加该变量,然后另一个线程将访问增加的计数器。有人能指出我正确的方向吗?

编辑:

这是调用 readline 函数,它运行得非常快,因为它被 15 个工作人员调用:

def callreadline(line):
        # word_to_idx = OrderedDict() 
        word_to_idx = OrderedDict()
        vectors = []
        vocabulary = None
        word_to_idx = read_w2v_word(line.split(' ')[0])
        try:
            vectors = np.append(vectors, [np.array(line.split(' ')[1:])], axis=0)
        except:
            vectors = np.array(line.split(' ')[1:],dtype=float)
        if vocabulary is not None:
            word_to_idx, vectors = filter_words(word_to_idx, vectors, vocabulary)
        return word_to_idx,vectors

标签: arrayspython-3.xconcurrent.futures

解决方案


我有一种感觉 callreadline 函数也没有达到它可能的速度,但这不是问题的一部分,所以让我试着为你解决剩下的问题:

with ThreadPoolExecutor(max_workers=15) as ex:
        f = open(filename, 'r', encoding='UTF-8')
        results = [ex.submit(callreadline, files) for files in f.readlines()]

word_to_idx = dict()
vectors = []
for count, future in enumerate(results):
    b = future.result()
    word_to_idx[b[0]] = count
    vectors.append(b[1])

vectors = np.array(vectors)


推荐阅读