首页 > 解决方案 > 在 sqlAlchemy 中使用多线程时出现问题

问题描述

我的要求是将 mysql 表保存到 csv 文件。为了并行运行它,我想使用一个线程池同时执行多个to_csv这个函数,以便并行转储许多表。在这里,我重现了该问题并实现了如下代码:

import concurrent.futures
from sqlalchemy import create_engine

executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)

engine = create_engine(f'mysql+pymysql://user:passwd@host:3306/')
conn = engine.connect()

db = 'db_enterprise_0'
table1 = 't_enterprise_0'
table2 = 't_enterprise_1'
table3 = 't_enterprise_2'
filename1 = '/data/test_threading1.csv'
filename2 = '/data/test_threading2.csv'
filename3 = '/data/test_threading3.csv'


def to_csv(db, table, filename, limit=None, delimiter=','):
    sql = f'select * from {db}.{table}'
    proxy = conn.execution_options(stream_results=True) \
      .execution_options(net_write_timeout=3600) \
      .execution_options(max_allowed_packet=67108864).execute(sql)
    outcsv.writerow(proxy.keys())
    while 'batch not empty':
        batch = proxy.fetchmany(10000)  # 100,000 rows at a time
        if not batch:
            break

        for row in batch:
            outcsv.writerow(row)

executor.submit(to_csv, db, table1, filename1)
executor.submit(to_csv, db, table2, filename2)
executor.submit(to_csv, db, table3, filename3)

问题:

奇怪的是它无法成功将数据写入csv,而是写入了一个空内容!虽然创建了 csv 文件,但内容为或只有 **header 里面。

-rw-rw-r-- 1 user user 20 Sep 18 10:05 test_threading1.csv
-rw-rw-r-- 1 user user  0 Sep 18 10:05 test_threading2.csv
-rw-rw-r-- 1 user user  0 Sep 18 10:05 test_threading3.csv

我的代码有什么问题?我想可能有一些问题conn.execute(sql)。那么这究竟是什么原因呢?或者还有其他方法可以将 MySQL 表sqlAlchemy并行转储到 csv 吗?

提前致谢。感谢是否有人可以给我一些建议。谢谢。

标签: pythonmysqlmultithreadingsqlalchemy

解决方案


根据@Ilja Everilä 的指南,我用多处理解决了它,但不是多线程。

这是代码:

def run_in_process(db, table, filename, delimiter=','):

    file_ = open(filename, 'w')
    outcsv = csv.writer(file_, delimiter=delimiter, quotechar='"', quoting=csv.QUOTE_MINIMAL)
    engine.dispose()

    with engine.connect() as conn:
        sql = f'select * from {db}.{table}'
        proxy = conn.execute(sql)

        outcsv.writerow(proxy.keys())
        while 'batch not empty':
            print('batch')
            batch = proxy.fetchmany(10000)  # 100,000 rows at a time
            if not batch:
                break

            for row in batch:
                outcsv.writerow(row)
    file_.close()


p1 = Process(target=run_in_process, args=(db, table1, filename1))
p2 = Process(target=run_in_process, args=(db, table2, filename2))
p1.start()
p2.start()
p.join()

参考:

https://docs.sqlalchemy.org/en/13/core/pooling.html#using-connection-pools-with-multiprocessing

sqlalchemy 中的连接池是线程安全的吗?


推荐阅读