python - 使用 python psycopg2 到 postgres 执行数据更新插入时,哪个是首选选项?
问题描述
我正在尝试在 python 中使用 psycopg2 更新和插入包含大约 3000 万条记录的大型 postgres 表中的行,我正在分批执行 100K 记录(~一批需要 6 分钟),因为我不想打开事务的时间过长以避免创建行锁定,因为在我编写它们时,其他事务也使用表行。我每次都在循环中打开和关闭连接和光标。
因此,通过 postgres 中的游标更新/插入以下哪个(或没有)更可取以避免锁定并获得更好的性能?
1>打开连接以及关闭每个批次的连接。
打开批处理的连接。
打开批处理的光标。
提交批处理的事务。
关闭批处理的光标。
关闭批处理的连接。
2>打开和关闭一次连接,但每次批处理都单独打开光标。
打开连接一次。
打开批处理的光标。
提交批处理的事务。
关闭批处理的光标。
在最后一批之后关闭连接。
如果还有更好的选择,请提出建议。目前我正在使用 cursor.execute 来执行插入/更新查询,但由于性能不那么快,我必须选择批处理。由于我没有足够的权限在插入时删除索引,因此我使用的是批处理路线。
使用的查询:-
更新:-
UPDATE target_tbl tgt
set descr = stage.descr,
prod_name = stage.prod_name,
item_name = stage.item_name,
url = stage.url,
col1_name = stage.col1_name,
col2_name = stage.col2_name,
col3_name = stage.col3_name,
col4_name = stage.col4_name,
col5_name = stage.col5_name,
col6_name = stage.col6_name,
col7_name = stage.col7_name,
col8_name = stage.col8_name,
flag = stage.flag
from tbl1 stage
where
tgt.col1 = stage.col1
and tgt.col2 = stage.col2
and coalesce(tgt.col3, 'col3'::text) = coalesce(stage.col3, 'col3'::text)
and coalesce(tgt.col4, 'col4'::text) = coalesce(stage.col4, 'col4'::text);
插入:-
Insert into tgt
select
stage.col1,
stage.col2,
stage.col3,
stage.col4
stage.prod_name,
stage.item_name,
stage.url,
stage.col1_name,
stage.col2_name,
stage.col3_name,
stage.col4_name,
stage.col5_name,
stage.col6_name,
stage.col7_name,
stage.col8_name,
stage.flag
from tbl1 stage
where NOT EXISTS (
select from tgt where
tgt.col1 = stage.col1
and tgt.col2 = stage.col2
and coalesce(tgt.col3, 'col3'::text) = coalesce(stage.col3, 'col3'::text)
and coalesce(tgt.col4, 'col4'::text) = coalesce(stage.col4, 'col4'::text)
) ;
解决方案
仅使用 SQLAlchemy 的强大功能,无需在 python 端进行 SQL 编码的 Upsert 函数。
您还可以决定批量大小,我使用的批量大小为 1000,但您可以尝试更大的批量。
如果您有一个字典列表和 SQL 表已经包含相同的列名和类型,请考虑使用此函数。如果您使用的是 DataFrame,您只需执行 df.to_dict('records') 即可获得可供输入的字典列表。
确保您的字典键和表列匹配
from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd
from sqlalchemy import create_engine
from typing import List, Dict
engine = create_engine(...)
def upsert_database(list_input: List[Dict], engine: sql_engine, table: str, schema: str) -> None:
if len(list_input) == 0:
return None
with engine.connect() as conn:
base = automap_base()
base.prepare(engine, reflect=True, schema=schema)
target_table = Table(table, base.metadata,
autoload=True, autoload_with=engine, schema=schema)
chunks = [list_input[i:i + 1000] for i in range(0, len(list_input), 1000)]
for chunk in chunks:
stmt = insert(target_table).values(chunk)
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
conn.execute(stmt.on_conflict_do_update(
constraint=f'{table}_pkey',
set_=update_dict)
)
推荐阅读
- python - Dm 每个有角色的人 discord.py
- node.js - 使用 lowdb 会导致 ERR_REQUIRE_ESM
- javascript - 动态更新变量名
- python - 如何从另一个类方法继承变量
- r - 如何绑定列表中的 data.frames 行并添加一个包含列表元素名称的新列?
- json - 数据类型映射:提取几个原子值
- flutter - 在flutter getx中对空值使用空检查运算符
- javascript - html javascript在主页中存储多个页面
- rust - 在 gtk-rs 中,如何获取 gtk::Window (大部分)在的当前屏幕?
- c - 如何检查一个大的二进制文件是否包含另一个文件的内容?