python - 返回 ID 时批量保存和更新
问题描述
所以我正在使用我正在sqlalchemy
做的一个项目。我遇到了一个问题,我最终每小时都会有数千条记录需要保存。这些记录可能会被插入或更新。我一直在使用bulk_save_objects
它,效果很好。但是现在我必须为这些正在保存的记录引入历史记录,这意味着我需要返回 ID,以便我可以将这些条目链接到历史记录表中的条目。我知道使用return_defaults
,并且有效。但是,它引入了一个问题,即我bulk_save_objects
一次插入和更新一个条目,而不是批量,这消除了目的。是否有另一种选择,我可以同时批量插入和更新,但保留 ID?
解决方案
可以使用类似于此处答案中描述的技术来实现所需的结果,方法是将行上传到临时表,然后执行 UPDATE,然后执行返回插入的 ID 值的 INSERT。对于 SQL Server,这将是 INSERT 语句上的 OUTPUT 子句:
main_table = "team"
# <set up test environment>
with engine.begin() as conn:
conn.execute(sa.text(f"DROP TABLE IF EXISTS [{main_table}]"))
conn.execute(
sa.text(
f"""
CREATE TABLE [dbo].[{main_table}](
[id] [int] IDENTITY(1,1) NOT NULL,
[prov] [varchar](2) NOT NULL,
[city] [varchar](50) NOT NULL,
[name] [varchar](50) NOT NULL,
[comments] [varchar](max) NULL,
CONSTRAINT [PK_team] PRIMARY KEY CLUSTERED
(
[id] ASC
)
)
"""
)
)
conn.execute(
sa.text(
f"""
CREATE UNIQUE NONCLUSTERED INDEX [UX_team_prov_city] ON [dbo].[{main_table}]
(
[prov] ASC,
[city] ASC
)
"""
)
)
conn.execute(
sa.text(
f"""
INSERT INTO [{main_table}] ([prov], [city], [name])
VALUES ('AB', 'Calgary', 'Flames')
"""
)
)
# <data for upsert>
df = pd.DataFrame(
[
("AB", "Calgary", "Flames", "hard-working, handsome lads"),
("AB", "Edmonton", "Oilers", "ruffians and scalawags"),
],
columns=["prov", "city", "name", "comments"],
)
# <perform upsert, returning IDs>
temp_table = "#so65525098"
with engine.begin() as conn:
df.to_sql(temp_table, conn, index=False, if_exists="replace")
conn.execute(
sa.text(
f"""
UPDATE main SET main.name = temp.name,
main.comments = temp.comments
FROM [{main_table}] main INNER JOIN [{temp_table}] temp
ON main.prov = temp.prov AND main.city = temp.city
"""
)
)
inserted = conn.execute(
sa.text(
f"""
INSERT INTO [{main_table}] (prov, city, name, comments)
OUTPUT INSERTED.prov, INSERTED.city, INSERTED.id
SELECT prov, city, name, comments FROM [{temp_table}] temp
WHERE NOT EXISTS (
SELECT * FROM [{main_table}] main
WHERE main.prov = temp.prov AND main.city = temp.city
)
"""
)
).fetchall()
print(inserted)
"""console output:
[('AB', 'Edmonton', 2)]
"""
# <check results>
with engine.begin() as conn:
pprint(conn.execute(sa.text(f"SELECT * FROM {main_table}")).fetchall())
"""console output:
[(1, 'AB', 'Calgary', 'Flames', 'hard-working, handsome lads'),
(2, 'AB', 'Edmonton', 'Oilers', 'ruffians and scalawags')]
"""
推荐阅读
- json - 将 JSONObject 发送到服务器并获取 String volley
- swagger - 如何从 Avro 模式生成 OpenAPI Swagger
- elasticsearch - Elastic OR-options 它没有像我预期的那样工作
- javascript - 在 React-Native 中调用 interstitialAd.load() 方法时应用程序崩溃
- c++ - 即使包含 -ldl (g++),仍然存在“对 dladdr 的未定义引用”链接错误
- ubuntu - 可以普遍执行严格的运输安全吗?
- python - 多处理 python 视频服务器
- processing - 循环和重复模式
- r - 根据子组为组分配值
- amazon-web-services - 数据存储方式为s3的HBase文件夹结构