首页 > 解决方案 > 使用 SqlAlchemy *一次*更新表中的 50K+ 记录

问题描述

背景

我正在处理一张表(在 postgres 数据库中),我们称之为Person。它JobTitle通过关联表与表相关PersonJobTitleAssociation(每个人可以有多个职位。)

engine = create_engine(DB_URI)
Base = declarative_base(engine)

class Person(Base):
   __tablename__ = 'person'
   id = Column(Integer, unique=True, primary_key=True)
   name = Column(String, unique=False)
   
   # relationship with all job_titles
   all_job_titles = relationship('JobTitle',
       secondary='person_job_title_association', 
       order_by='desc(person_job_title_association.c.date_created)')
   
   # Update this
   magic_value = Column(String, unique=False)


class PersonJobTitleAssociation(Base):
   __tablename__ = 'person_job_title_association'
   person_id = Column(Integer, ForeignKey('person.id'), primary_key=True)
   job_title_id = Column(Integer, ForeignKey('job_title.id'), primary_key=True)
   date_created = Column(DateTime, nullable=False, default=datetime.datetime.utcnow)

class JobTitle(Base):
   __tablename__ = 'job_title'
   id = Column(Integer, unique=True, primary_key=True)
   name = Column(String, unique=True)


# Once everything is declared, bind to the session
session = sessionmaker(bind=engine)()

问题

我想访问每个人的Person最新信息JobTitle并对some_magic_function()这个人的namejob title. (掩码“某些必须在 python 中完成的操作”)

import random
import string

def some_magic_function(name, job_title):
   """This operation must be done in python"""
   # Update the job_title if blank
   if not job_title:
      job_title = 'unspecified'

   # Get a random character and check if it's in our person's name
   char = random.choice(string.ascii_letters)
   
   if char in name:
      return job_title.upper()
   else:
      return job_title.lower()

我正在像这样更新值:(
假设这个查询已经过优化,不需要改进)

query = session.query(Person)\
    .options(joinedload(Person.all_job_titles))\
    .order_by(Person.id)

# operate on all people
for person in query:
   
   # Get and set the magic value
   magic_value = some_magic_function(person.name, person.all_job_titles[0])
   if person.magic_value != magic_value:
       person.magic_value = magic_value

# Finally, once complete, commit the session
session.commit()

问题

在 python 端查询和更新值非常快。但是调用时事情变得非常缓慢session.commit()。做了一些研究,似乎每次更新值时都会sqlalchemy锁定整个表。person此外,每个更新都作为其自己的命令执行。(这是针对 50K 记录的 50K 独立 SQL 命令。)

期望的结果

我想要一个 Pythonic 解决方案,它可以“一口气”更新所有 50K 记录。

我考虑过使用read_only会话,然后将更新值传递到元组数组中并通过with_updates会话发送更新。这似乎是一种对 SQL 更友好的方法,但有点笨拙且不直截了当。

非常感激!

标签: pythonsqlalchemy

解决方案


您可以通过简单地启用批处理快速执行助手来减少到数据库的往返次数,但作为一种更明确的方法,可以以一种或另一种方式生成临时/派生的更改表:

  • CREATE TEMPORARY TABLECOPY
  • (VALUES ...) AS ...,可能与显式使用execute_values()
  • unnest()行数组
  • 从 JSON 使用json(b)_to_recordset()

允许您批量发送更改,并执行以下操作UPDATE ... FROM

import csv
from io import StringIO

# Pretending that the query is optimized and deterministic
query = session.query(Person)\
    .options(joinedload(Person.all_job_titles))\
    .order_by(Person.id)

# Prepare data for COPY
changes_csv = StringIO()
changes_writer = csv.writer(changes_csv)
for p in query:
    mv = some_magic_function(p.name, p.all_job_titles[0])
    if p.magic_value != mv:
        changes_writer.writerow((p.id, mv))

changes_csv.seek(0)

session.execute("""
    CREATE TEMPORARY TABLE new_magic (
        person_id INTEGER, 
        value TEXT
    ) ON COMMIT DROP
""")

# Access the underlying psycopg2 connection directly to obtain a cursor
with session.connection().connection.cursor() as cur:
    stmt = "COPY new_magic FROM STDIN WITH CSV"
    cur.copy_expert(stmt, changes_csv)

# Make sure that the planner has proper statistics to work with
session.execute("ANALYZE new_magic ( person_id )")
session.execute("""
    UPDATE person
    SET magic_value = new_magic.value
    FROM new_magic
    WHERE person.id = new_magic.person_id
""")
session.commit()

不完全是“Pythonic”,因为它不会让 ORM 弄清楚要做什么,但另一方面,显式优于隐式。


推荐阅读