首页 > 解决方案 > SQLAlchemy 到 MSSQL 使用 create_engine

问题描述

我能找到的大多数显示使用 Python 的完整 MSSQL 连接方法的示例在几个月前已经过时,部分原因是 SQLAlchemy 1.3中的一些优化。我正在尝试复制我在文档中看到的内容。

我无法使用 pyodbc 将 SQLAlchemy 连接到 MSSSQL Server。

我有一个本地 SQL 服务器,可以从 SQL Server Management Studio 访问:#DESKTOP-QLSOTTG\SQLEXPRESS
数据库是:TestDB
用户名,对于这个例子是:TestUser
密码,对于这个例子是:TestUserPass

我想运行一个将 pandas 数据帧导入 MSSQL 数据库的测试用例(用例?),以便找出最快的处理方式。但是,这个问题的目的是围绕连接性。

信用:我从 Gord 那里借了一些代码用于数据框/更新

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus


# for pyodbc
#engine = create_engine('mssql+pyodbc://TestUser:TestUserPAss@DESKTOP-QLSOTTG\\SQLEXPRESS:1433/TestDB?driver=ODBC+Driver+17+for+SQL+Server', fast_executemany=True)
engine = create_engine("mssql+pyodbc://TestUser:TestUserPass@DESKTOP-QLSOTTG\\SQLEXPRESS:1433/TestDB?driver=ODBC+Driver+13+for+SQL+Server", fast_executemany=True)

# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
    [[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
    columns=[f'col{y:03}' for y in range(num_cols)]
)

t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")

我收到的错误如下。我将假设服务器“主动拒绝连接”是因为我的连接字符串以某种方式搞砸了,但我似乎不明白为什么。:

OperationalError: (pyodbc.OperationalError) ('08001', '[08001] [Microsoft][ODBC Driver 13 for SQL Server]TCP Provider: No connection could be made because the target machine actively refused it.\r\n (10061) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 13 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 13 for SQL Server]A network-related or instance-specific error has occurred while establishing a connection to SQL Server. Server is not found or not accessible. Check if instance name is correct and if SQL Server is configured to allow remote connections. For more information see SQL Server Books Online. (10061)')
(Background on this error at: http://sqlalche.me/e/13/e3q8)

可以从 SQL Server Management Studio 连接数据库和用户。*

关于我可能遗漏的任何想法?

SQL Server 屏幕截图

笔记:

标签: pythonsql-serverpandassqlalchemy

解决方案


我将用一个完整的例子来回答这个问题,因为在此过程中我遇到了一些其他问题。

这个例子能够:

  • fast_executemany与用户指定的内存友好分块一起使用,可以非常快速地将数据加载到 MS SQL 数据库。
  • 在大约 0.3 秒内将 10,000 条记录(25 列)加载到 Microsoft SQL (MSSQL) 数据库。
  • 在大约 45 秒内将 1,000,000 条记录(25 列)加载到 Microsoft SQL (MSSQL) 数据库。
  • 在大约 9 分钟内将 10,000,000 条记录(25 列)加载到 Microsoft SQL (MSSQL) 数据库。
  • 使用预先配置的函数对数据进行分块,避免使用 pandas chunksize,这会导致较大数据集上的内存错误。分块的功劳
  • 可以帮助您将数据加载到内存较低的 SQL 服务器中。批量加载需要大量内存,较小的加载大小使用更少的内存。
  • 可以添加一个 try/except 语句来捕获您尝试记录的块的加载错误/稍后再试类型设置。

我为其他一些数据库提供程序包含了一些未经测试的连接字符串。截至 2020 年 12 月的 pandas、sqlalchemy、pyodbc 等的当前版本。

%%time #remove this if you are not using a Jupyter notebook and just want to run a .py script

import pandas as pd
import numpy as np
import sqlalchemy as sql
import sys
import math

# Enterprise DB to be used
DRIVER = "ODBC Driver 17 for SQL Server"
USERNAME = "TestUser"
PSSWD = "TestUser"
SERVERNAME = "DESKTOP-QLSOTTG"
INSTANCENAME = "\SQLEXPRESS"
DB = "TestDB"
TABLE = "perftest"


conn_executemany = sql.create_engine(
    f"mssql+pyodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}", fast_executemany=True
)



def chunker(seq, size):
    return (seq[pos : pos + size] for pos in range(0, len(seq), size))


def insert_with_progress(df, engine, table="", schema=""):
    con = engine.connect()

    # Replace table
    engine.execute(f"DROP TABLE IF EXISTS {schema}.{table};")

    # Insert with progress
    SQL_SERVER_CHUNK_LIMIT = 100000
    chunksize = math.floor(SQL_SERVER_CHUNK_LIMIT / len(df.columns))

    for chunk in chunker(df, chunksize):
        chunk.to_sql(
            name=table,
            con=con,
            if_exists="append",
            index=False
        )
        
df = pd.DataFrame(np.random.random((10 ** 7, 24)))
df['TextCol'] = "Test Goes Here"
df.head()
print("DataFrame is", round(sys.getsizeof(df) / 1024 ** 2, 1), "MB")
print("DataFrame contains", len(df), "rows by", len(df.columns), "columns")


# Doing it like this errors out. Can't seem to be able to debug the straight pandas call.
# df.to_sql(TABLE, conn_sqlalchemy, index=False, if_exists='replace', method='multi', chunksize=2100)

insert_with_progress(df, conn_executemany, table=TABLE)

关于连接字符串:

  1. f"mssql+pyodbc://如果您希望更改为另一种数据库类型,您最可能只需要更改以开头的行
  2. 如果您的 SQL 服务器不使用实例名称(例如 SQLSERVERNAME\Instance_Name),那么您可以将实例名称参数设置为空。
  3. 如果您确实使用实例名称,请确保将 \ 留在变量的开头。
  4. 如果您使用不同的连接字符串,您还需要将变量名称替换为上面代码窗口最后一行中的连接字符串名称。

其他提供者的备用包含语句
这些包括:

  • pymssql
  • 涡轮增压器
import pymssql as ms
import sqlalchemy as sql
import sqlalchemy_turbodbc as st

备用连接字符串
归功于DSN 样式字符串,我已对其进行了修改以使用用户名/密码。

conn_sqlalchemy = sql.create_engine(f"mssql+pyodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}")

conn_executemany = sql.create_engine(
    f"mssql+pyodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}", fast_executemany=True
)

conn_turbodbc = sql.create_engine(f"mssql+turbodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}")

conn_pymssql = sql.create_engine(f"mssql+pymssql://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}")

推荐阅读