首页 > 解决方案 > 如果 session.add 在 sql alchemy ORM 中引发重复键错误,如何更新行

问题描述

    engine = db_connect()
    create_table(engine)
    Session = sessionmaker(bind=engine)
    self.session = Session()

    settings = get_project_settings()
    stock_engine = create_engine(settings.get('STOCK_DATABASE_URI'), echo=True,pool_recycle=1800)
    StockSession = sessionmaker(bind=stock_engine)
    self.stock_session = StockSession()

def process_item(self, item, spider):
    sleep(0.3)
    item_table.uni_identifier = item['uni_identifier']
    
    symbols = self.stock_session.execute("SELECT * from table3")
    
    for row in symbols:
        stock_table = Trial_table()
        stock_table.stock_symbol = row["symbol"]
        stock_table.company_name = row["companyName"]
        stock_table.last_updated = arrow.utcnow().to('local').format('YYYY-MM-DD HH:mm:ss')
        
        self.session.add(stock_table)
        
        try:
            self.session.commit()
        except:
            self.session.rollback()
            raise

    self.session.add(item_table)
    try:
       self.session.commit()
    except:
       self.session.rollback()
       raise

sqlalchemy.exc.IntegrityError:(pymysql.err.IntegrityError)(1062,“重复条目'GNLN'键'trial_table.stock_symbol_UNIQUE'”)

上面的错误是在 session.add(stock_table) 处生成的。我想要实现的是如果 session.add(stock_table) 生成了重复的键错误,我想更新而不是插入。我怎样才能做到这一点?已经尝试使用 self.session.merge(stock_table)。它不工作。Trial_table() 是数据库中各个表的 ORM。

标签: python-3.xsqlalchemy

解决方案


对于单线程应用程序,最简单的方法是捕获IntegrityError并尝试更新(IntegrityError在提交(或刷新)会话时发生)。

    session.add(Trial_Table(stock_symbol=symbol, company_name=company_name))
    try:
        session.commit()
    except sa.exc.IntegrityError:
        session.rollback()
        instance = session.query(Trial_Table).filter(Trial_Table.stock_symbol == symbol).first()
        instance.company_name = company_name
        try:
            session.commit()
        except:
            session.rollback()
            raise
    except:
        session.rollback()
        raise

丑陋,但有效。

使用 MySQL 的“ON DUPLICATE KEY ... UPDATE”功能将是另一种方式。请注意,ORM功能是实验性的。它可能看起来像这样(未经测试,因为我的 Mariadb 版本不支持RETURNING)。

from sqlalchemy import select, update

...

stmt = update(Trial_Table).where(Trial_Table.stock_symbol == symbol).values(company_name=company_name).returning(Trial_Table)
orm_stmt = select(Trial_Table).from_statement(stmt).execution_options(populate_existing=True)
session.execute(orm_stmt)
try:
    session.commit()
except:
    session.rollback()
    raise    

或者,您可以使用 SQLAlchemy 核心语法来执行upsert,而不必依赖对RETURNING.

from sqlalchemy.dialects.mysql import insert
...

# Operate on the model's __table__ attribute
stmt = insert(Trial_Table.__table__).values(stock_symbol=symbol, company_name=company_name)
on_duplicate_key_stmt = stmt.on_duplicate_key_update(company_name=company_name)
session.execute(on_duplicate_key_stmt)
try:
    session.commit()
except:
    session.rollback()
    raise    

推荐阅读