python - 如何使用 sqlalchemy 将值插入具有序列号的 postgresql 数据库
问题描述
我有一个函数用于更新 PostgreSQL 中的表。通过创建临时表并在完成后将其删除,可以很好地避免重复插入。但是,我有一些带有序列号的表,我必须在一列中传递序列号。否则,我会收到密钥丢失的错误消息。如何在这些表中插入值并自动分配序列键?如果可能的话,我宁愿修改下面的函数。
def export_to_sql(df, table_name):
from sqlalchemy import create_engine
engine = create_engine(f'postgresql://{user}:{password}@{host}:5432/{user}')
df.to_sql(con=engine,
name='temporary_table',
if_exists='append',
index=False,
method = 'multi')
with engine.begin() as cnx:
insert_sql = f'INSERT INTO {table_name} (SELECT * FROM temporary_table) ON CONFLICT DO NOTHING; DROP TABLE temporary_table'
cnx.execute(insert_sql)
用于创建表的代码
CREATE TABLE symbols
(
symbol_id serial NOT NULL,
symbol varchar(50) NOT NULL,
CONSTRAINT PK_symbols PRIMARY KEY ( symbol_id )
);
CREATE TABLE tweet_symols(
tweet_id varchar(50) REFERENCES tweets,
symbol_id int REFERENCES symbols,
PRIMARY KEY (tweet_id, symbol_id),
UNIQUE (tweet_id, symbol_id)
);
CREATE TABLE hashtags
(
hashtag_id serial NOT NULL,
hashtag varchar(140) NOT NULL,
CONSTRAINT PK_hashtags PRIMARY KEY ( hashtag_id )
);
CREATE TABLE tweet_hashtags
(
tweet_id varchar(50) NOT NULL,
hashtag_id integer NOT NULL,
CONSTRAINT FK_344 FOREIGN KEY ( tweet_id ) REFERENCES tweets ( tweet_id )
);
CREATE INDEX fkIdx_345 ON tweet_hashtags
(
tweet_id
);
解决方案
该INSERT
语句未定义目标列,因此 Postgresql 将尝试将值插入定义为SERIAL
.
我们可以通过提供一个目标列列表来解决这个问题,省略序列类型。为此,我们使用 SQLAlchemy 从数据库中获取要插入的表的元数据,然后制作目标列的列表。SQLAlchemy 不会告诉我们列是否是使用 创建的SERIAL
,但我们会假设它是主键并设置为自动增量。定义的主键列GENERATED ... AS IDENTITY
也将被过滤掉 - 这可能是可取的,因为它们的行为方式与SERIAL
列相同。
import sqlalchemy as sa
def export_to_sql(df, table_name):
engine = sa.create_engine(f'postgresql://{user}:{password}@{host}:5432/{user}')
df.to_sql(con=engine,
name='temporary_table',
if_exists='append',
index=False,
method='multi')
# Fetch table metadata from the database
table = sa.Table(table_name, sa.MetaData(), autoload_with=engine)
# Get the names of columns to be inserted,
# assuming auto-incrementing PKs are serial types
column_names = ','.join(
[f'"{c.name}"' for c in table.columns
if not (c.primary_key and c.autoincrement)]
)
with engine.begin() as cnx:
insert_sql = sa.text(
f'INSERT INTO {table_name} ({column_names}) (SELECT * FROM temporary_table) ON CONFLICT DO NOTHING; DROP TABLE temporary_table'
)
cnx.execute(insert_sql)
推荐阅读
- python - 如何在 Jupyter Notebook 的 API 有效负载中使用变量
- selenium - 无法将测试执行结果传递给 Robo 框架中的变量
- angular - 当我从 web 服务填充数据时,Angular6 和 ng2-charts 不显示任何图表
- mysql - mysql全文通配符搜索,如正则表达式
- c# - c#中的预处理技术
- python - Python - 数据清洗
- arrays - Applescript 比较同一数组中的项目
- sql - 更新查询给出奇怪的错误“CfPlnUt_Aemk5Mr77-AevA2”
- pagination - Gremlin 中的分页
- python - 这个错误 TypeError: 'Button' object is not callable 是什么意思?