首页 > 解决方案 > 如何使用 sqlalchemy 将值插入具有序列号的 postgresql 数据库

问题描述

我有一个函数用于更新 PostgreSQL 中的表。通过创建临时表并在完成后将其删除,可以很好地避免重复插入。但是,我有一些带有序列号的表,我必须在一列中传递序列号。否则,我会收到密钥丢失的错误消息。如何在这些表中插入值并自动分配序列键?如果可能的话,我宁愿修改下面的函数。

def export_to_sql(df, table_name):
    from sqlalchemy import create_engine
    engine = create_engine(f'postgresql://{user}:{password}@{host}:5432/{user}')
    df.to_sql(con=engine,
              name='temporary_table',
              if_exists='append',
              index=False,
              method = 'multi')
    with engine.begin() as cnx:
        insert_sql = f'INSERT INTO {table_name} (SELECT * FROM temporary_table) ON CONFLICT DO NOTHING; DROP TABLE temporary_table'
        cnx.execute(insert_sql)

用于创建表的代码

CREATE TABLE symbols
(
 symbol_id serial NOT NULL,
 symbol    varchar(50) NOT NULL,
 CONSTRAINT PK_symbols PRIMARY KEY ( symbol_id )
);


CREATE TABLE tweet_symols(
    tweet_id  varchar(50)  REFERENCES tweets,
    symbol_id   int  REFERENCES symbols,
    PRIMARY KEY (tweet_id, symbol_id),
    UNIQUE (tweet_id, symbol_id)
);

CREATE TABLE hashtags
(
 hashtag_id serial NOT NULL,
 hashtag    varchar(140) NOT NULL,
 CONSTRAINT PK_hashtags PRIMARY KEY ( hashtag_id )
);


CREATE TABLE tweet_hashtags
(
 tweet_id   varchar(50) NOT NULL,
 hashtag_id integer NOT NULL,
 CONSTRAINT FK_344 FOREIGN KEY ( tweet_id ) REFERENCES tweets ( tweet_id )
);

CREATE INDEX fkIdx_345 ON tweet_hashtags
(
 tweet_id
);

表格示例

标签: pythonpostgresqlsqlalchemy

解决方案


INSERT语句未定义目标列,因此 Postgresql 将尝试将值插入定义为SERIAL.

我们可以通过提供一个目标列列表来解决这个问题,省略序列类型。为此,我们使用 SQLAlchemy 从数据库中获取要插入的表的元数据,然后制作目标列的列表。SQLAlchemy 不会告诉我们列是否是使用 创建的SERIAL,但我们会假设它是主键并设置为自动增量。定义的主键列GENERATED ... AS IDENTITY也将被过滤掉 - 这可能是可取的,因为它们的行为方式与SERIAL列相同。

import sqlalchemy as sa

def export_to_sql(df, table_name):
    engine = sa.create_engine(f'postgresql://{user}:{password}@{host}:5432/{user}')
    df.to_sql(con=engine,
              name='temporary_table',
              if_exists='append',
              index=False,
              method='multi')

    # Fetch table metadata from the database
    table = sa.Table(table_name, sa.MetaData(), autoload_with=engine)

    # Get the names of columns to be inserted,
    # assuming auto-incrementing PKs are serial types
    column_names = ','.join(
        [f'"{c.name}"' for c in table.columns 
         if not (c.primary_key and c.autoincrement)]
    )

    with engine.begin() as cnx:


        insert_sql = sa.text(
            f'INSERT INTO {table_name} ({column_names}) (SELECT * FROM temporary_table) ON CONFLICT DO NOTHING; DROP TABLE temporary_table'
        )
        cnx.execute(insert_sql)

推荐阅读