首页 > 解决方案 > 如何在金字塔 Web 应用程序中手动提交 sqlalchemy 数据库事务?

问题描述

我有一个 Pyramid Web 应用程序,在将更改提交到 sqlalchemy 数据库后需要运行 Celery 任务。我知道我可以使用 request.tm.get().addAfterCommitHook() 来做到这一点。但是,这对我不起作用,因为我还需要在视图中使用 celery 任务的 task_id。因此,在我对 Celery 任务调用 task.delay() 之前,我需要对数据库进行更改。

zope.sqlalchemy 文档说我可以使用 transaction.commit() 手动提交。但是,这对我不起作用;celery 任务在更改提交到数据库之前运行,即使我在调用 task.delay() 之前调用了 transaction.commit()

我的金字塔视图代码如下所示:

ride=appstruct_to_ride(dbsession,appstruct)
dbsession.add(ride)

# Flush dbsession so ride gets an id assignment
dbsession.flush()

# Store ride id
ride_id=ride.id
log.info('Created ride {}'.format(ride_id))

# Commit ride to database
import transaction
transaction.commit()

# Queue a task to update ride's weather data
from ..processing.weather import update_ride_weather
update_weather_task=update_ride_weather.delay(ride_id)

url = self.request.route_url('rides')
return HTTPFound(
    url,
    content_type='application/json',
    charset='',
    text=json.dumps(
        {'ride_id':ride_id,
         'update_weather_task_id':update_weather_task.task_id}))

我的芹菜任务如下所示:

@celery.task(bind=True,ignore_result=False)
def update_ride_weather(self,ride_id, train_model=True):

    from ..celery import session_factory
    
    logger.debug('Received update weather task for ride {}'.format(ride_id))

    dbsession=session_factory()
    dbsession.expire_on_commit=False

    with transaction.manager:
        ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()

celery 任务因 NoResultFound 而失败:

  File "/app/cycling_data/processing/weather.py", line 478, in update_ride_weather
    ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3282, in one
    raise orm_exc.NoResultFound("No row was found for one()")

当我事后检查数据库时,我看到记录实际上是在 celery 任务运行并失败之后创建的。所以这意味着 transaction.commit() 没有按预期提交事务,而是在视图返回后由 zope.sqlalchemy 机器自动提交更改。如何在我的视图代码中手动提交事务?

标签: pythonsqlalchemycelerypyramid

解决方案


request.tmpyramid_tmthreadlocaltransaction.manager对象或 per-request 对象定义,具体取决于您的配置方式pyramid_tm(查找pyramid_tm.manager_hook在某处定义以确定正在使用的对象。

您的问题很棘手,因为您所做的任何事情都应该适合pyramid_tm以及它期望事情如何运作。具体来说,它计划在请求的生命周期内控制事务——提早提交对于该事务不是一个好主意。pyramid_tm如果在请求的生命周期中的任何地方发生任何故障 - 不仅仅是在您的视图可调用中,它正在尝试帮助提供故障保护功能以回滚整个请求。

选项1:

无论如何,尽早提交。如果您要这样做,那么提交后的失败将无法回滚已提交的数据,因此您可能会部分提交请求。好的,很好,这是你的问题,所以答案是使用request.tm.commit()可能后跟 arequest.tm.begin()来为任何后续更改开始一个新的。您还需要注意不要跨该边界共享 sqlalchemy 托管对象,例如request.user等,因为它们需要刷新/合并到新事务中(默认情况下,SQLAlchemy 的身份缓存不能信任从不同事务加载的数据,因为那只是隔离级别如何工作)。

选项 2:

为您想要提前提交的数据启动一个单独的事务。好的,所以假设您没有使用任何 threadlocals transaction.manager,或者scoped_session您可以开始自己的事务并提交它,而无需触及dbsessionpyramid_tm. 一些适用于 pyramid-cookiecutter-starter 项目结构的通用代码可能是:

from myapp.models import get_tm_session

tmp_tm = transaction.TransactionManager(explicit=True)
with tmp_tm:
    dbsession_factory = request.registry['dbsession_factory']
    tmp_dbsession = get_tm_session(dbsession_factory, tmp_tm)
    # ... do stuff with tmp_dbsession that is committed in this with-statement
    ride = appstruct_to_ride(tmp_dbsession, appstruct)
    # do not use this ride object outside of the with-statement
    tmp_dbsession.add(ride)
    tmp_dbsession.flush()
    ride_id = ride.id

# we are now committed so go ahead and start your background worker
update_weather_task = update_ride_weather.delay(ride_id)

# maybe you want the ride object outside of the tmp_dbsession
ride = dbsession.query(Ride).filter(Ride.id==ride_id).one()

return {...}

这还不错 - 可能是在不将 celery 连接到 pyramid_tm 控制的 dbsession 的情况下,就故障模式而言,你能做的最好的事情。


推荐阅读