首页 > 解决方案 > 使用 python psycopg2 到 postgres 执行数据更新插入时,哪个是首选选项?

问题描述

我正在尝试在 python 中使用 psycopg2 更新和插入包含大约 3000 万条记录的大型 postgres 表中的行,我正在分批执行 100K 记录(~一批需要 6 分钟),因为我不想打开事务的时间过长以避免创建行锁定,因为在我编写它们时,其他事务也使用表行。我每次都在循环中打开和关闭连接和光标。

因此,通过 postgres 中的游标更新/插入以下哪个(或没有)更可取以避免锁定并获得更好的性能?

1>打开连接以及关闭每个批次的连接。

2>打开和关闭一次连接,但每次批处理都单独打开光标。

如果还有更好的选择,请提出建议。目前我正在使用 cursor.execute 来执行插入/更新查询,但由于性能不那么快,我必须选择批处理。由于我没有足够的权限在插入时删除索引,因此我使用的是批处理路线。

使用的查询:-

更新:-

UPDATE target_tbl tgt
        set descr = stage.descr,
        prod_name = stage.prod_name,
        item_name = stage.item_name,
        url       = stage.url,
        col1_name = stage.col1_name,
        col2_name = stage.col2_name,
        col3_name = stage.col3_name,
        col4_name = stage.col4_name,
        col5_name = stage.col5_name,
        col6_name = stage.col6_name,
        col7_name = stage.col7_name,
        col8_name = stage.col8_name,
        flag      = stage.flag
    from tbl1 stage
    where 
    tgt.col1 = stage.col1
    and tgt.col2 = stage.col2
    and coalesce(tgt.col3, 'col3'::text) = coalesce(stage.col3, 'col3'::text)
    and coalesce(tgt.col4, 'col4'::text) = coalesce(stage.col4, 'col4'::text);
        

插入:-

 Insert into tgt
    select 
    stage.col1,
    stage.col2,
    stage.col3,
    stage.col4
    stage.prod_name,
    stage.item_name,
    stage.url,
    stage.col1_name,
    stage.col2_name,
    stage.col3_name,
    stage.col4_name,
    stage.col5_name,
    stage.col6_name,
    stage.col7_name,
    stage.col8_name,
    stage.flag
    from tbl1 stage
    where NOT EXISTS (
    select from tgt where
    tgt.col1 = stage.col1
    and tgt.col2 = stage.col2
    and coalesce(tgt.col3, 'col3'::text) = coalesce(stage.col3, 'col3'::text)
    and coalesce(tgt.col4, 'col4'::text) = coalesce(stage.col4, 'col4'::text)
    ) ;

标签: pythoncursorpsycopg2query-performancepostgresql-9.1

解决方案


仅使用 SQLAlchemy 的强大功能,无需在 python 端进行 SQL 编码的 Upsert 函数。

您还可以决定批量大小,我使用的批量大小为 1000,但您可以尝试更大的批量。

如果您有一个字典列表和 SQL 表已经包含相同的列名和类型,请考虑使用此函数。如果您使用的是 DataFrame,您只需执行 df.to_dict('records') 即可获得可供输入的字典列表。

确保您的字典键和表列匹配

from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd
from sqlalchemy import create_engine
from typing import List, Dict

engine = create_engine(...)


def upsert_database(list_input: List[Dict], engine: sql_engine, table: str, schema: str) -> None:
    if len(list_input) == 0:
        return None
    with engine.connect() as conn:
        base = automap_base()
        base.prepare(engine, reflect=True, schema=schema)
        target_table = Table(table, base.metadata,
                             autoload=True, autoload_with=engine, schema=schema)
        chunks = [list_input[i:i + 1000] for i in range(0, len(list_input), 1000)]
        for chunk in chunks:
            stmt = insert(target_table).values(chunk)
            update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
            conn.execute(stmt.on_conflict_do_update(
                constraint=f'{table}_pkey',
                set_=update_dict)
            )

推荐阅读