python - Python for循环csv并发
问题描述
我偶然发现了 80.000 行以上的大文件,我必须将这些文件保存在我的数据库中。将它全部推送到我的 mysql 数据库需要 20-30 分钟。我有一个简单的 for 循环,它只是循环整个 csv。
import csv
import MySQLdb
# open the connection to the MySQL server.
# using MySQLdb
mydb = MySQLdb.connect(host='hst', user='usr', passwd='pwd', db='db')
cursor = mydb.cursor()
with open('product_de.csv') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=';')
# execute and insert the csv into the database.
for row in csv_reader:
if "PVP_BIG" and "DATE_ADD" in row:
print "First line removed"
else:
print "Not found!"
sql = "INSERT INTO big (SKU,Category,Attribute1,Attribute2,Value1,Value2,Brand,Price,PVP_BIG,PVD,EAN13,WIDTH,HEIGHT,DEPTH,WEIGHT,Stock) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
val = (row[0], row[1],row[3],row[4], row[5],row[6], row[8], row[10], row[11], row[12], row[15], row[16], row[17], row[18], row[19], row[20])
cursor.execute(sql, val)
print row
#close the connection to the database.
#mydb.commit()
cursor.close()
print "CSV has been imported into the database"
有什么方法,我可以把它分成并发,所以根据计算机硬件可能需要3-5分钟?
解决方案
首先,您可以通过从内部循环中删除 print(row) 来获得很大的加速。程序中的所有其他内容都在等待此操作,这是一个 IO 操作,可能需要比您想象的要长得多的时间。其次,通过批处理 INSERT 语句(即一次插入多于一行,例如 100 条左右),您可能会发现显着的加速。第三,做到这一点的最好方法可能是涉及 asyncio 但我没有太多经验。您很可能 IO 必须与 DB 交谈并从 csv 文件中获取数据,并且从不同时进行这两项操作,因此我将使用如下所示的简单双线程解决方案:
import csv
import MySQLdb
import threading
from queue import Queue
def row_insert_thread(q: Queue, cursor, mydb):
while True:
command = q.get()
if command is None:
cursor.close()
#mydb.commit()
break
cursor.execute(*command)
mydb = MySQLdb.connect(host='hst', user='usr', passwd='pwd', db='db')
cursor = mydb.cursor()
insert_q = Queue()
row_thread = Thread(target=row_insert_thread,args=(insert_q,cursor,mydb)
row_thread.start()
with open('product_de.csv') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=';')
# execute and insert the csv into the database.
next(csv_reader) #skip the header row I'm assuming there is only one
for row in csv_reader:
sql = "INSERT INTO big (SKU,Category,Attribute1,Attribute2,Value1,Value2,Brand,Price,PVP_BIG,PVD,EAN13,WIDTH,HEIGHT,DEPTH,WEIGHT,Stock) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
val = (row[0], row[1],row[3],row[4], row[5],row[6], row[8], row[10], row[11], row[12], row[15], row[16], row[17], row[18], row[19], row[20])
insert_q.put((sql, val))
print row
#close the connection to the database.
insert_q.put(None)
row_thread.join()
print "CSV has been imported into the database"
对于插入语句,我不习惯 MySQL 从这里的 sqlite 体验,我认为这会起作用:
def insert_multiple_rows(cursor, rows:list):
sql = f"INSERT INTO big (SKU,Category,Attribute1,Attribute2,Value1,Value2,Brand,Price,PVP_BIG,PVD,EAN13,WIDTH,HEIGHT,DEPTH,WEIGHT,Stock) VALUES {'(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s),'*len(rows)}"[:-1]
args = [col for col in [row for row in rows]]
cursor.execute(sql,args)
如果你想使用它,我希望你可以将它集成到你的代码中,只需更改线程以获取列表,然后在主循环中将值添加到列表中,直到它达到你想要的任何数字或者你用完行,然后把列表到 insert_q
推荐阅读
- flutter - 如何将获取的数据从服务传递到不同的屏幕
- c - 当我想引用结构变量时,指针类型不兼容
- javascript - 如何激活隐藏输入的功能?
- python - 拖放后如何获取 PyQt5 QListView 索引以匹配模型的列表索引?
- excel - 删除 Windows Excel 中的所有空格
- vba - 访问 VBA - 使用变量名中的计数器循环
- extjs - 如何在 Ext JS 的日历组件中添加监听器?
- firebase - Firebase - Cloud Firestore(集合)
- css - 使用显示时使用 css 网格交替行颜色:行包装器的内容
- vba - 访问 vba 超链接