python - Python Pyodbc 无法通过 executemany 返回第一个插入的 id
问题描述
设置
MWE:我在 SQL Server 中有一张这样的表
CREATE TABLE dbo.MyTable(
order_id INT IDENTITY(1,1),
column2 DECIMAL,
column3 INT
PRIMARY KEY(order_id)
)
我正在使用 pyodbc 将一些数据以 pandas.DataFrame 的形式插入到表中。我正在使用以下数据:
column2 column3
0 1.23 5
1 4.95 9
2 6.79 10
我在哪里创建了这个示例数据框
data = pd.DataFrame({'column2':[1.23, 4.95, 6.79], 'column3':[5,9,10]})
我使用以下语句插入数据
stmt = "INSERT INTO dbo.MyTable(column2, column3) OUTPUT Inserted.order_id VALUES (?, ?)"
问题
这是我用来插入所有内容并返回值的代码:
# Set up connection and create cursor
conn_string = "DRIVER={MyDriver};SERVER=MyServer;DATABASE=MyDb;UID=MyUID;PWD=MyPWD"
cnxn = pyodbc.connect(conn_string)
cnxn.autocommit = False
cursor = cnxn.cursor()
cursor.fast_executemany = True
# Upload data
cursor.executemany(stmt, data.values.tolist())
# Process the result
try:
first_result = cursor.fetchall()
except pyodbc.ProgrammingError:
first_result = None
result_sets = []
while cursor.nextset():
result_sets.append(cursor.fetchall())
all_inserted_ids = np.array(result_sets).flatten()
但是,我没有得到我应该得到的所有 id!例如,假设表中没有数据,我不会得到
all_inserted_ids = np.array([1, 2, 3])
但我只会得到
all_inserted_ids = np.array([2, 3])
这意味着我在某处丢失了第一个 ID!
请注意,这first_result
永远不会奏效。它总是抛出以下内容:
pyodbc.ProgrammingError: No results. Previous SQL was not a query.
我也尝试过使用cursor.fetchone()
,cursor.fetchone()[0]
或者cursor.fetchval()
他们给了我同样的错误。
我尝试过但不起作用的方法
1)添加“SET NOCOUNT ON”
我尝试使用与问题相同的代码,但使用
stmt =
"""
SET NOCOUNT ON;
INSERT INTO dbo.MyTable(column2, column3)
OUTPUT Inserted.order_id
VALUES (?, ?)
"""
输出是[1, 2]
如此我失踪3
。
2)添加“SET NOCOUNT ON”并将输出插入表变量
我使用了以下语句:
stmt =
"""
SET NOCOUNT ON;
DECLARE @NEWID TABLE(ID INT);
INSERT INTO dbo.MyTable(column2, column3)
OUTPUT Inserted.order_id INTO @NEWID(ID)
VALUES (?, ?)
SELECT ID FROM @NEWID
"""
这同样不起作用,因为我只获得了 '[2, 3]' 但没有获得 '1'。
3)选择@@IDENTITY
我使用了以下语句:
stmt =
"""
INSERT INTO dbo.MyTable(column2, column3)
OUTPUT Inserted.order_id
VALUES (?, ?)
SELECT @@IDENTITY
"""
但它没有像我得到的那样工作array([Decimal('1'), 2, Decimal('2'), 3, Decimal('3')]
4)使用 SET NOCOUNT ON 选择 @@IDENTITY
我用了
stmt =
"""
SET NOCOUNT ON
INSERT INTO dbo.MyTable(column2, column3)
OUTPUT Inserted.order_id
VALUES (?, ?);
SELECT @@IDENTITY
"""
但我又得到array([Decimal('1'), 2, Decimal('2'), 3, Decimal('3')], dtype=object)
了。
5)不使用 OUTPUT 选择 @@IDENTITY
我用了:
stmt =
"""
INSERT INTO dbo.MyTable(column2, column3)
VALUES (?, ?);
SELECT @@IDENTITY
"""
但我得到了[Decimal('2') Decimal('3')]
6)选择@@IDENTITY 而不使用 OUTPUT 但使用 SET NOCOUNT ON
我用了:
stmt =
"""
SET NOCOUNT ON
INSERT INTO dbo.MyTable(column2, column3)
VALUES (?, ?);
SELECT @@IDENTITY
"""
但我又得到了:[Decimal('2') Decimal('3')]
解决这个问题的可能方法,这真的很糟糕,但确实有效
一种可能的方法是创建一个新表,我们将在其中存储 id 并在完成后截断它。这太可怕了,但我找不到任何其他解决方案..
创建表:
CREATE TABLE NEWID(
ID INT
PRIMARY KEY (ID)
)
接下来是完整的代码:
import pyodbc
import pandas as pd
import numpy as np
# Connect
conn_string = """
DRIVER={MYDRIVER};
SERVER=MYSERVER;
DATABASE=DB;
UID=USER;
PWD=PWD
"""
cnxn = pyodbc.connect(conn_string)
cnxn.autocommit = False
cursor = cnxn.cursor()
cursor.fast_executemany = True
# Data, Statement, Execution
data = pd.DataFrame({'column2': [1.23, 4.95, 6.79], 'column3': [5, 9, 10]})
stmt = """
INSERT INTO dbo.MyTable(column2, column3)
OUTPUT Inserted.order_id INTO NEWID(ID)
VALUES (?, ?);
"""
cursor.executemany(stmt, data.values.tolist())
cursor.execute("SELECT ID FROM NEWID;")
# Get stuff
try:
first_result = cursor.fetchall()
except pyodbc.ProgrammingError:
first_result = None
result_sets = []
while cursor.nextset():
result_sets.append(cursor.fetchall())
all_inserted_ids = np.array(result_sets).flatten()
print('First result: ', first_result)
print('All IDs: ', all_inserted_ids)
cursor.commit()
# Remember to truncate the table for next use
cursor.execute("TRUNCATE TABLE dbo.NEWID;", [])
cursor.commit()
这将返回
First result: [(1, ), (2, ), (3, )]
All IDs: []
所以我们只保留第一个结果。
解决方案
我已经实现了一个类似于你的方法 1)的方法,使用带有 pyodbc 方言的 sqlAlchemy。它可以很容易地直接适应 pyodbc 库。诀窍是SELECT NULL;
在插入查询之前有一个。这样插入查询的第一个 OUTPUT 将在返回的集合中。如果您插入n
了行,则使用此方法需要2n-1
使用游标的nextset()
. 这是一个补丁,因为 MSSQL 或 pyodbc 会丢弃第一组。我想知道是否有一个选项是 MSSQL 服务器或 pyodbc,您可以在其中指定返回第一组。
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import TableClause
def bulk_insert_return_defaults_pyodbc(
session: Session, statement: TableClause, parameters: List[dict], mapping: dict
):
"""
Parameters
----------
session:
SqlAlchemy Session object
statement:
SqlAlchemy table clause object (ie. Insert)
parameters:
List of parameters
ex: [{"co1": "value1", "col2": "value2"}, {"co1": "value3", "col2": "value4"}]
mapping
Mapping between SqlAlchemy declarative base attribute and name of column in
database
Returns
-------
"""
if len(parameters) > 0:
connexion = session.connection()
context = session.bind.dialect.execution_ctx_cls._init_statement(
session.bind.dialect,
connexion,
connexion._Connection__connection.connection,
statement,
parameters,
)
statement = context.statement.compile(
session.bind, column_keys=list(context.parameters[0].keys())
)
session.bind.dialect.do_executemany(
context.cursor,
"SELECT NULL; " + str(statement),
[
tuple(p[p_i] for p_i in statement.params.keys())
for p in context.parameters
],
context,
)
results = []
while context.cursor.nextset():
try:
result = context.cursor.fetchone()
if result[0] is not None:
results.append(result)
except Exception:
continue
return [
{mapping[r.cursor_description[i][0]]: c for i, c in enumerate(r)}
for r in results
]
else:
return []
multi_params = bulk_insert_return_defaults_pyodbc(
session,
table_cls.__table__.insert(returning=[table_cls.id]),
multi_params,
{
getattr(table_cls, c).expression.key: c
for c in list(vars(table_cls))
if isinstance(getattr(table_cls, c), InstrumentedAttribute)
},
)