首页 > 解决方案 > 我可以使用多处理来使用 sqlalchemy 查询不同的服务器吗?

问题描述

我有一些我想并行查询的 sql 服务器。为此,我尝试将请求放入进程中,因为它不是我尝试多次查询的服务器,而是我只查询一次的许多服务器:

import pandas as pd
from sqlalchemy import create_engine
from multiprocessing import Pool, cpu_count

def get_df(engine):
    sql_string = "select * from sys.all_columns"
    df = pd.read_sql(sql=sql_string, con=engine)
    return df


def create_odbc_engine(server):
    db_odbc_string = "mssql+pyodbc://@{server}-db:9999/some_database?driver=ODBC+Driver+17+for+SQL+Server".format(
        server=server)
    return create_engine(db_odbc_string)


if __name__ == "__main__":
    servers = ["server1", "server2", "server3",...]
    args = [(create_odbc_engine(server),) for server in servers]
    n_processes = cpu_count() - 1
    with Pool(processes=n_processes) as pool:
        results = pool.map(get_df, args)
    

但是我得到泡菜错误:

AttributeError: Can't pickle local object 'create_engine.<locals>.connect'

有什么办法可以并行执行吗?

标签: pythonpandassqlalchemymultiprocessing

解决方案


Python 不能腌制函数,所以你不能create_odbc_engine在 args 中发送函数。您可以改为在 get_df.

import pandas as pd
from sqlalchemy import create_engine
from multiprocessing import Pool, cpu_count

def get_df(server):
    engine = (create_odbc_engine(server),)
    sql_string = "select * from sys.all_columns"
    df = pd.read_sql(sql=sql_string, con=engine)
    return df


def create_odbc_engine(server):
    db_odbc_string = "mssql+pyodbc://@{server}-db:9999/some_database?driver=ODBC+Driver+17+for+SQL+Server".format(
        server=server)
    return create_engine(db_odbc_string)


if __name__ == "__main__":
    servers = ["server1", "server2", "server3",...]
    # args = [(create_odbc_engine(server),) for server in servers]
    n_processes = cpu_count() - 1
    with Pool(processes=n_processes) as pool:
        results = pool.map(get_df, servers)

推荐阅读