python - 在 sqlAlchemy 中使用多线程时出现问题
问题描述
我的要求是将 mysql 表保存到 csv 文件。为了并行运行它,我想使用一个线程池同时执行多个to_csv
这个函数,以便并行转储许多表。在这里,我重现了该问题并实现了如下代码:
import concurrent.futures
from sqlalchemy import create_engine
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
engine = create_engine(f'mysql+pymysql://user:passwd@host:3306/')
conn = engine.connect()
db = 'db_enterprise_0'
table1 = 't_enterprise_0'
table2 = 't_enterprise_1'
table3 = 't_enterprise_2'
filename1 = '/data/test_threading1.csv'
filename2 = '/data/test_threading2.csv'
filename3 = '/data/test_threading3.csv'
def to_csv(db, table, filename, limit=None, delimiter=','):
sql = f'select * from {db}.{table}'
proxy = conn.execution_options(stream_results=True) \
.execution_options(net_write_timeout=3600) \
.execution_options(max_allowed_packet=67108864).execute(sql)
outcsv.writerow(proxy.keys())
while 'batch not empty':
batch = proxy.fetchmany(10000) # 100,000 rows at a time
if not batch:
break
for row in batch:
outcsv.writerow(row)
executor.submit(to_csv, db, table1, filename1)
executor.submit(to_csv, db, table2, filename2)
executor.submit(to_csv, db, table3, filename3)
问题:
奇怪的是它无法成功将数据写入csv,而是写入了一个空内容!虽然创建了 csv 文件,但内容为空或只有 **header 里面。
-rw-rw-r-- 1 user user 20 Sep 18 10:05 test_threading1.csv
-rw-rw-r-- 1 user user 0 Sep 18 10:05 test_threading2.csv
-rw-rw-r-- 1 user user 0 Sep 18 10:05 test_threading3.csv
我的代码有什么问题?我想可能有一些问题conn.execute(sql)
。那么这究竟是什么原因呢?或者还有其他方法可以将 MySQL 表sqlAlchemy
并行转储到 csv 吗?
提前致谢。感谢是否有人可以给我一些建议。谢谢。
解决方案
根据@Ilja Everilä 的指南,我用多处理解决了它,但不是多线程。
这是代码:
def run_in_process(db, table, filename, delimiter=','):
file_ = open(filename, 'w')
outcsv = csv.writer(file_, delimiter=delimiter, quotechar='"', quoting=csv.QUOTE_MINIMAL)
engine.dispose()
with engine.connect() as conn:
sql = f'select * from {db}.{table}'
proxy = conn.execute(sql)
outcsv.writerow(proxy.keys())
while 'batch not empty':
print('batch')
batch = proxy.fetchmany(10000) # 100,000 rows at a time
if not batch:
break
for row in batch:
outcsv.writerow(row)
file_.close()
p1 = Process(target=run_in_process, args=(db, table1, filename1))
p2 = Process(target=run_in_process, args=(db, table2, filename2))
p1.start()
p2.start()
p.join()
参考:
https://docs.sqlalchemy.org/en/13/core/pooling.html#using-connection-pools-with-multiprocessing
推荐阅读
- usb - WIN 10 中的 USB 枚举失败
- django - Django REST Framework - SerializerMethodField 在 serializer.save() 上为空
- css - 更改位置网格
- c# - 在 Linq 中合并两个集合
- javascript - SAPUI5:带参数的路由 - 如何获取绑定上下文的正确路径
- linux - Gnome:检测选择时复制
- firebase - 如何限制 Firebase Admin SDK 从 firebase 数据库中删除数据
- python - ubuntu中的opencv python屏幕截图
- azure-devops - 用于自动化 Azure DevOps Pipelines 的 API?
- angular - Angular 6:无法以模式提交表单