首页 > 解决方案 > Python Pyodbc 无法通过 executemany 返回第一个插入的 id

问题描述

设置

MWE:我在 SQL Server 中有一张这样的表

CREATE TABLE dbo.MyTable(
    order_id INT IDENTITY(1,1),
    column2 DECIMAL,
    column3 INT
    PRIMARY KEY(order_id)
)

我正在使用 pyodbc 将一些数据以 pandas.DataFrame 的形式插入到表中。我正在使用以下数据:

   column2  column3
0     1.23        5
1     4.95        9
2     6.79       10

我在哪里创建了这个示例数据框

 data = pd.DataFrame({'column2':[1.23, 4.95, 6.79], 'column3':[5,9,10]})

我使用以下语句插入数据

stmt = "INSERT INTO dbo.MyTable(column2, column3) OUTPUT Inserted.order_id VALUES (?, ?)"

问题

这是我用来插入所有内容并返回值的代码:

# Set up connection and create cursor
conn_string = "DRIVER={MyDriver};SERVER=MyServer;DATABASE=MyDb;UID=MyUID;PWD=MyPWD"
cnxn = pyodbc.connect(conn_string)
cnxn.autocommit = False
cursor = cnxn.cursor()
cursor.fast_executemany = True
# Upload data
cursor.executemany(stmt, data.values.tolist())
# Process the result
try:
    first_result = cursor.fetchall()
except pyodbc.ProgrammingError:
    first_result = None
result_sets = []
while cursor.nextset():
    result_sets.append(cursor.fetchall())
all_inserted_ids = np.array(result_sets).flatten()

但是,我没有得到我应该得到的所有 id!例如,假设表中没有数据,我不会得到

all_inserted_ids = np.array([1, 2, 3])

但我只会得到

all_inserted_ids = np.array([2, 3])

这意味着我在某处丢失了第一个 ID!

请注意,这first_result永远不会奏效。它总是抛出以下内容:

pyodbc.ProgrammingError: No results.  Previous SQL was not a query.

我也尝试过使用cursor.fetchone()cursor.fetchone()[0]或者cursor.fetchval()他们给了我同样的错误。

我尝试过但不起作用的方法

1)添加“SET NOCOUNT ON”

我尝试使用与问题相同的代码,但使用

stmt = 
"""
SET NOCOUNT ON; 
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id 
VALUES (?, ?)
"""

输出是[1, 2]如此我失踪3

2)添加“SET NOCOUNT ON”并将输出插入表变量

我使用了以下语句:

stmt = 
"""
SET NOCOUNT ON; 
DECLARE @NEWID TABLE(ID INT); 
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id INTO @NEWID(ID) 
VALUES (?, ?) 
SELECT ID FROM @NEWID
"""

这同样不起作用,因为我只获得了 '[2, 3]' 但没有获得 '1'。

3)选择@@IDENTITY

我使用了以下语句:

stmt = 
"""
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id 
VALUES (?, ?)
SELECT @@IDENTITY
"""

但它没有像我得到的那样工作array([Decimal('1'), 2, Decimal('2'), 3, Decimal('3')]

4)使用 SET NOCOUNT ON 选择 @@IDENTITY

我用了

stmt = 
"""
SET NOCOUNT ON
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id
VALUES (?, ?);
SELECT @@IDENTITY
"""

但我又得到array([Decimal('1'), 2, Decimal('2'), 3, Decimal('3')], dtype=object)了。

5)不使用 OUTPUT 选择 @@IDENTITY

我用了:

stmt = 
"""
INSERT INTO dbo.MyTable(column2, column3) 
VALUES (?, ?);
SELECT @@IDENTITY
"""

但我得到了[Decimal('2') Decimal('3')]

6)选择@@IDENTITY 而不使用 OUTPUT 但使用 SET NOCOUNT ON

我用了:

stmt = 
"""
SET NOCOUNT ON
INSERT INTO dbo.MyTable(column2, column3) 
VALUES (?, ?);
SELECT @@IDENTITY
"""

但我又得到了:[Decimal('2') Decimal('3')]

解决这个问题的可能方法,这真的很糟糕,但确实有效

一种可能的方法是创建一个新表,我们将在其中存储 id 并在完成后截断它。这太可怕了,但我找不到任何其他解决方案..

创建表:

CREATE TABLE NEWID(
    ID INT
    PRIMARY KEY (ID)
)

接下来是完整的代码:

import pyodbc
import pandas as pd
import numpy as np
# Connect
conn_string = """
DRIVER={MYDRIVER};
SERVER=MYSERVER;
DATABASE=DB;
UID=USER;
PWD=PWD
"""
cnxn = pyodbc.connect(conn_string)
cnxn.autocommit = False
cursor = cnxn.cursor()
cursor.fast_executemany = True
# Data, Statement, Execution
data = pd.DataFrame({'column2': [1.23, 4.95, 6.79], 'column3': [5, 9, 10]})
stmt = """
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id INTO NEWID(ID)
VALUES (?, ?);
"""
cursor.executemany(stmt, data.values.tolist())
cursor.execute("SELECT ID FROM NEWID;")
# Get stuff
try:
    first_result = cursor.fetchall()
except pyodbc.ProgrammingError:
    first_result = None
result_sets = []
while cursor.nextset():
    result_sets.append(cursor.fetchall())
all_inserted_ids = np.array(result_sets).flatten()
print('First result: ', first_result)
print('All IDs: ', all_inserted_ids)
cursor.commit()
# Remember to truncate the table for next use
cursor.execute("TRUNCATE TABLE dbo.NEWID;", [])
cursor.commit()

这将返回

First result:  [(1, ), (2, ), (3, )]
All IDs:  []

所以我们只保留第一个结果。

标签: pythonsqlsql-serverpyodbc

解决方案


我已经实现了一个类似于你的方法 1)的方法,使用带有 pyodbc 方言的 sqlAlchemy。它可以很容易地直接适应 pyodbc 库。诀窍是SELECT NULL;在插入查询之前有一个。这样插入查询的第一个 OUTPUT 将在返回的集合中。如果您插入n了行,则使用此方法需要2n-1使用游标的nextset(). 这是一个补丁,因为 MSSQL 或 pyodbc 会丢弃第一组。我想知道是否有一个选项是 MSSQL 服务器或 pyodbc,您可以在其中指定返回第一组。

from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import TableClause

def bulk_insert_return_defaults_pyodbc(
    session: Session, statement: TableClause, parameters: List[dict], mapping: dict
):
    """

    Parameters
    ----------
    session:
        SqlAlchemy Session object
    statement:
        SqlAlchemy table clause object (ie. Insert)
    parameters:
        List of parameters
        ex: [{"co1": "value1", "col2": "value2"}, {"co1": "value3", "col2": "value4"}]
    mapping
        Mapping between SqlAlchemy declarative base attribute and name of column in 
        database

    Returns
    -------

    """
    if len(parameters) > 0:
        connexion = session.connection()
        context = session.bind.dialect.execution_ctx_cls._init_statement(
            session.bind.dialect,
            connexion,
            connexion._Connection__connection.connection,
            statement,
            parameters,
        )
        statement = context.statement.compile(
            session.bind, column_keys=list(context.parameters[0].keys())
        )
        session.bind.dialect.do_executemany(
            context.cursor,
            "SELECT NULL; " + str(statement),
            [
                tuple(p[p_i] for p_i in statement.params.keys())
                for p in context.parameters
            ],
            context,
        )
        results = []

        while context.cursor.nextset():
            try:
                result = context.cursor.fetchone()
                if result[0] is not None:
                    results.append(result)
            except Exception:
                continue

        return [
            {mapping[r.cursor_description[i][0]]: c for i, c in enumerate(r)}
            for r in results
        ]
    else:
        return []

multi_params = bulk_insert_return_defaults_pyodbc(
    session,
    table_cls.__table__.insert(returning=[table_cls.id]),
    multi_params,
    {
        getattr(table_cls, c).expression.key: c
        for c in list(vars(table_cls))
        if isinstance(getattr(table_cls, c), InstrumentedAttribute)
    },
)

推荐阅读