首页 > 解决方案 > 如何使用多处理或多线程加速此循环?

问题描述

恐怕我没有以正确的方式做多线程的事情,所以我来到这里寻求智慧。我有两个地址数组,我必须检查第一个数组的地址是否存在于第二个数组中,以防它在数组 2 中找不到最相似的地址。

调用具有“官方”地址directory的数组,调用我需要验证的数组look_address

代码如下:

import pandas as pd
import numpy as np
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
from datetime import datetime,timedelta
import threading
import queue

class myThread(threading.Thread):
    def __init__(self,threadID,name,q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name=name
        self.q = q
        
    def run(self):
        print(f"starting {self.name}")
        process_data(self.name,self.q)
        print(f"ending {self.name}")


locs = []
ratios={}
def process_data(threadName,q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            d = q.get()
            queueLock.release()
            d = d.strip()
            if directory.isin([d]).any():

                locs.append(d)
                
            else:

                pos = process.extract(d,directory.values,scorer=fuzz.ratio,limit=50)
                ratios[d] = pos

        else:
            queueLock.release()

threadlist = ["T-1","T-2","T-3","T-4","T-5","T-6","T-7","T-8","T-9","T-10"]
nameList = look_address
queueLock = threading.Lock()
workQueue = queue.Queue(len(nameList)+1)
threads=[]
threadID=1

exitFlag=0
for name in threadlist:
    thread = myThread(threadID,name,workQueue)
    thread.start()
    threads.append(thread)
    threadID+=1
    
queueLock.acquire()
for addr in nameList:
    workQueue.put(addr)
queueLock.release()

total_steps = len(workQueue.queue)

tot_sec = 0
t0 = datetime.now()
while not workQueue.empty():
    total_seconds =(datetime.now()-t0).total_seconds()
    if total_seconds == 0:
            total_seconds = 1e-8
    progress = 1-len(workQueue.queue)/total_steps
    tot_sec+=total_seconds
    
    print("\rProgreso: {pr:.2f}% || Buenas/Errores: {gb}/{bd}".format(
                    pr = progress*100,
                    its = 1/total_seconds,
                    elap = timedelta(seconds=np.round(tot_sec)),
                    gb=len(locs),
                    bd=len(errors),
                    eta = timedelta(seconds=np.round(total_seconds*(total_steps-len(workQueue.queue))))),end="",flush=True)


exitFlag = 1

for t in threads:
    t.join()
print("\nExiting Main Thread")

每个请求process.extract大约需要 25 秒(做了一个%timeit)。现在,使用上面的脚本似乎并没有加快数据处理速度。它已经运行了大约 2 个小时,并且进步了大约 4.29%。

我的两个问题是:

  1. 多线程的实现是否正确?
  2. 如何加快数据处理速度?也许在亚马逊或谷歌的 VPS 上运行它?

我想了解为什么这会这么慢以及如何加快速度。

编辑:更改自:

if not workQueue.empty():
     d = q.get()
     d = d.strip()
     if directory.isin([d]).any():

          locs.append(d)
                    
     else:
    
          pos = process.extract(d,directory.values,scorer=fuzz.ratio,limit=50)
          ratios[d] = pos
          queueLock.release()

至:

if not workQueue.empty():
     d = q.get()
     queueLock.release()
     d = d.strip()
     if directory.isin([d]).any():

          locs.append(d)
                    
     else:
    
          pos = process.extract(d,directory.values,scorer=fuzz.ratio,limit=50)
          ratios[d] = pos

标签: python-3.xmultithreadingparallel-processing

解决方案


推荐阅读