python-3.x - 如何使用多处理或多线程加速此循环?
问题描述
恐怕我没有以正确的方式做多线程的事情,所以我来到这里寻求智慧。我有两个地址数组,我必须检查第一个数组的地址是否存在于第二个数组中,以防它在数组 2 中找不到最相似的地址。
调用具有“官方”地址directory
的数组,调用我需要验证的数组look_address
。
代码如下:
import pandas as pd
import numpy as np
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
from datetime import datetime,timedelta
import threading
import queue
class myThread(threading.Thread):
def __init__(self,threadID,name,q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name=name
self.q = q
def run(self):
print(f"starting {self.name}")
process_data(self.name,self.q)
print(f"ending {self.name}")
locs = []
ratios={}
def process_data(threadName,q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
d = q.get()
queueLock.release()
d = d.strip()
if directory.isin([d]).any():
locs.append(d)
else:
pos = process.extract(d,directory.values,scorer=fuzz.ratio,limit=50)
ratios[d] = pos
else:
queueLock.release()
threadlist = ["T-1","T-2","T-3","T-4","T-5","T-6","T-7","T-8","T-9","T-10"]
nameList = look_address
queueLock = threading.Lock()
workQueue = queue.Queue(len(nameList)+1)
threads=[]
threadID=1
exitFlag=0
for name in threadlist:
thread = myThread(threadID,name,workQueue)
thread.start()
threads.append(thread)
threadID+=1
queueLock.acquire()
for addr in nameList:
workQueue.put(addr)
queueLock.release()
total_steps = len(workQueue.queue)
tot_sec = 0
t0 = datetime.now()
while not workQueue.empty():
total_seconds =(datetime.now()-t0).total_seconds()
if total_seconds == 0:
total_seconds = 1e-8
progress = 1-len(workQueue.queue)/total_steps
tot_sec+=total_seconds
print("\rProgreso: {pr:.2f}% || Buenas/Errores: {gb}/{bd}".format(
pr = progress*100,
its = 1/total_seconds,
elap = timedelta(seconds=np.round(tot_sec)),
gb=len(locs),
bd=len(errors),
eta = timedelta(seconds=np.round(total_seconds*(total_steps-len(workQueue.queue))))),end="",flush=True)
exitFlag = 1
for t in threads:
t.join()
print("\nExiting Main Thread")
每个请求process.extract
大约需要 25 秒(做了一个%timeit
)。现在,使用上面的脚本似乎并没有加快数据处理速度。它已经运行了大约 2 个小时,并且进步了大约 4.29%。
我的两个问题是:
- 多线程的实现是否正确?
- 如何加快数据处理速度?也许在亚马逊或谷歌的 VPS 上运行它?
我想了解为什么这会这么慢以及如何加快速度。
编辑:更改自:
if not workQueue.empty():
d = q.get()
d = d.strip()
if directory.isin([d]).any():
locs.append(d)
else:
pos = process.extract(d,directory.values,scorer=fuzz.ratio,limit=50)
ratios[d] = pos
queueLock.release()
至:
if not workQueue.empty():
d = q.get()
queueLock.release()
d = d.strip()
if directory.isin([d]).any():
locs.append(d)
else:
pos = process.extract(d,directory.values,scorer=fuzz.ratio,limit=50)
ratios[d] = pos
解决方案
推荐阅读
- django - 从管理员创建新用户时不调用 CustomUserManager
- excel - 查找列中每个事件的第一个和最后一个地址(动态)
- c++ - 为什么向向量添加智能指针会导致内存泄漏?
- python - FbProphet 模型给出 TypeError: float() argument must be a string or a number, not 'datetime.datetime'
- ssl - 如何在 redbird 代理服务器上更新 ssl 证书
- java - JPA - 映射具有相同实体的列
- javascript - 在 JavaScript 中对字符串调用 forEach
- css - 如何修复“没有这样的文件或目录,lstat 'scss/'”?
- activerecord - Yii ActiveRecord 的 children() 和 parent() 方法是什么?
- javascript - 要创建一个页面,然后获取数据并将其存储在 java 脚本对象中,然后使用 HTTPPOST 将数据发送到 .Net