python - 使用 SqlAlchemy *一次*更新表中的 50K+ 记录
问题描述
背景
我正在处理一张表(在 postgres 数据库中),我们称之为Person
。它JobTitle
通过关联表与表相关PersonJobTitleAssociation
。(每个人可以有多个职位。)
engine = create_engine(DB_URI)
Base = declarative_base(engine)
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, unique=True, primary_key=True)
name = Column(String, unique=False)
# relationship with all job_titles
all_job_titles = relationship('JobTitle',
secondary='person_job_title_association',
order_by='desc(person_job_title_association.c.date_created)')
# Update this
magic_value = Column(String, unique=False)
class PersonJobTitleAssociation(Base):
__tablename__ = 'person_job_title_association'
person_id = Column(Integer, ForeignKey('person.id'), primary_key=True)
job_title_id = Column(Integer, ForeignKey('job_title.id'), primary_key=True)
date_created = Column(DateTime, nullable=False, default=datetime.datetime.utcnow)
class JobTitle(Base):
__tablename__ = 'job_title'
id = Column(Integer, unique=True, primary_key=True)
name = Column(String, unique=True)
# Once everything is declared, bind to the session
session = sessionmaker(bind=engine)()
问题
我想访问每个人的Person
最新信息JobTitle
并对some_magic_function()
这个人的name
和job title
. (掩码“某些必须在 python 中完成的操作”)。
import random
import string
def some_magic_function(name, job_title):
"""This operation must be done in python"""
# Update the job_title if blank
if not job_title:
job_title = 'unspecified'
# Get a random character and check if it's in our person's name
char = random.choice(string.ascii_letters)
if char in name:
return job_title.upper()
else:
return job_title.lower()
我正在像这样更新值:(
假设这个查询已经过优化,不需要改进)
query = session.query(Person)\
.options(joinedload(Person.all_job_titles))\
.order_by(Person.id)
# operate on all people
for person in query:
# Get and set the magic value
magic_value = some_magic_function(person.name, person.all_job_titles[0])
if person.magic_value != magic_value:
person.magic_value = magic_value
# Finally, once complete, commit the session
session.commit()
问题
在 python 端查询和更新值非常快。但是调用时事情变得非常缓慢session.commit()
。做了一些研究,似乎每次更新值时都会sqlalchemy
锁定整个表。person
此外,每个更新都作为其自己的命令执行。(这是针对 50K 记录的 50K 独立 SQL 命令。)
期望的结果
我想要一个 Pythonic 解决方案,它可以“一口气”更新所有 50K 记录。
我考虑过使用read_only
会话,然后将更新值传递到元组数组中并通过with_updates
会话发送更新。这似乎是一种对 SQL 更友好的方法,但有点笨拙且不直截了当。
非常感激!
解决方案
您可以通过简单地启用批处理快速执行助手来减少到数据库的往返次数,但作为一种更明确的方法,可以以一种或另一种方式生成临时/派生的更改表:
CREATE TEMPORARY TABLE
和COPY
(VALUES ...) AS ...
,可能与显式使用execute_values()
unnest()
行数组- 从 JSON 使用
json(b)_to_recordset()
允许您批量发送更改,并执行以下操作UPDATE ... FROM
:
import csv
from io import StringIO
# Pretending that the query is optimized and deterministic
query = session.query(Person)\
.options(joinedload(Person.all_job_titles))\
.order_by(Person.id)
# Prepare data for COPY
changes_csv = StringIO()
changes_writer = csv.writer(changes_csv)
for p in query:
mv = some_magic_function(p.name, p.all_job_titles[0])
if p.magic_value != mv:
changes_writer.writerow((p.id, mv))
changes_csv.seek(0)
session.execute("""
CREATE TEMPORARY TABLE new_magic (
person_id INTEGER,
value TEXT
) ON COMMIT DROP
""")
# Access the underlying psycopg2 connection directly to obtain a cursor
with session.connection().connection.cursor() as cur:
stmt = "COPY new_magic FROM STDIN WITH CSV"
cur.copy_expert(stmt, changes_csv)
# Make sure that the planner has proper statistics to work with
session.execute("ANALYZE new_magic ( person_id )")
session.execute("""
UPDATE person
SET magic_value = new_magic.value
FROM new_magic
WHERE person.id = new_magic.person_id
""")
session.commit()
不完全是“Pythonic”,因为它不会让 ORM 弄清楚要做什么,但另一方面,显式优于隐式。
推荐阅读
- angularjs - 更改语言 - 屏幕变白
- kubernetes - 临时存储垃圾收集
- python - paho.mqtt.client 订阅后收不到消息
- javascript - Chart.js 如何获取 X 轴的最小值-最大值?
- python - MinMaxScaler + 具有数值和分类数据的决策树分类器
- angular - ngFor 在现有应用程序中不工作,将在新创建的应用程序中工作
- node.js - 从 URL 获取对象属性值
- javascript - 如何限制可拖动区域?它在顶部和左侧起作用,但在右侧和底部不起作用
- ruby-on-rails - 表单远程:trueauthentity_token 消失
- javascript - 如何让 jQuery 对话框按钮超出对话框并彼此相邻显示?