python - 如何在金字塔 Web 应用程序中手动提交 sqlalchemy 数据库事务?
问题描述
我有一个 Pyramid Web 应用程序,在将更改提交到 sqlalchemy 数据库后需要运行 Celery 任务。我知道我可以使用 request.tm.get().addAfterCommitHook() 来做到这一点。但是,这对我不起作用,因为我还需要在视图中使用 celery 任务的 task_id。因此,在我对 Celery 任务调用 task.delay() 之前,我需要对数据库进行更改。
zope.sqlalchemy 文档说我可以使用 transaction.commit() 手动提交。但是,这对我不起作用;celery 任务在更改提交到数据库之前运行,即使我在调用 task.delay() 之前调用了 transaction.commit()
我的金字塔视图代码如下所示:
ride=appstruct_to_ride(dbsession,appstruct)
dbsession.add(ride)
# Flush dbsession so ride gets an id assignment
dbsession.flush()
# Store ride id
ride_id=ride.id
log.info('Created ride {}'.format(ride_id))
# Commit ride to database
import transaction
transaction.commit()
# Queue a task to update ride's weather data
from ..processing.weather import update_ride_weather
update_weather_task=update_ride_weather.delay(ride_id)
url = self.request.route_url('rides')
return HTTPFound(
url,
content_type='application/json',
charset='',
text=json.dumps(
{'ride_id':ride_id,
'update_weather_task_id':update_weather_task.task_id}))
我的芹菜任务如下所示:
@celery.task(bind=True,ignore_result=False)
def update_ride_weather(self,ride_id, train_model=True):
from ..celery import session_factory
logger.debug('Received update weather task for ride {}'.format(ride_id))
dbsession=session_factory()
dbsession.expire_on_commit=False
with transaction.manager:
ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()
celery 任务因 NoResultFound 而失败:
File "/app/cycling_data/processing/weather.py", line 478, in update_ride_weather
ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3282, in one
raise orm_exc.NoResultFound("No row was found for one()")
当我事后检查数据库时,我看到记录实际上是在 celery 任务运行并失败之后创建的。所以这意味着 transaction.commit() 没有按预期提交事务,而是在视图返回后由 zope.sqlalchemy 机器自动提交更改。如何在我的视图代码中手动提交事务?
解决方案
request.tm
由pyramid_tm
threadlocaltransaction.manager
对象或 per-request 对象定义,具体取决于您的配置方式pyramid_tm
(查找pyramid_tm.manager_hook
在某处定义以确定正在使用的对象。
您的问题很棘手,因为您所做的任何事情都应该适合pyramid_tm
以及它期望事情如何运作。具体来说,它计划在请求的生命周期内控制事务——提早提交对于该事务不是一个好主意。pyramid_tm
如果在请求的生命周期中的任何地方发生任何故障 - 不仅仅是在您的视图可调用中,它正在尝试帮助提供故障保护功能以回滚整个请求。
选项1:
无论如何,尽早提交。如果您要这样做,那么提交后的失败将无法回滚已提交的数据,因此您可能会部分提交请求。好的,很好,这是你的问题,所以答案是使用request.tm.commit()
可能后跟 arequest.tm.begin()
来为任何后续更改开始一个新的。您还需要注意不要跨该边界共享 sqlalchemy 托管对象,例如request.user
等,因为它们需要刷新/合并到新事务中(默认情况下,SQLAlchemy 的身份缓存不能信任从不同事务加载的数据,因为那只是隔离级别如何工作)。
选项 2:
为您想要提前提交的数据启动一个单独的事务。好的,所以假设您没有使用任何 threadlocals transaction.manager
,或者scoped_session
您可以开始自己的事务并提交它,而无需触及dbsession
由pyramid_tm
. 一些适用于 pyramid-cookiecutter-starter 项目结构的通用代码可能是:
from myapp.models import get_tm_session
tmp_tm = transaction.TransactionManager(explicit=True)
with tmp_tm:
dbsession_factory = request.registry['dbsession_factory']
tmp_dbsession = get_tm_session(dbsession_factory, tmp_tm)
# ... do stuff with tmp_dbsession that is committed in this with-statement
ride = appstruct_to_ride(tmp_dbsession, appstruct)
# do not use this ride object outside of the with-statement
tmp_dbsession.add(ride)
tmp_dbsession.flush()
ride_id = ride.id
# we are now committed so go ahead and start your background worker
update_weather_task = update_ride_weather.delay(ride_id)
# maybe you want the ride object outside of the tmp_dbsession
ride = dbsession.query(Ride).filter(Ride.id==ride_id).one()
return {...}
这还不错 - 可能是在不将 celery 连接到 pyramid_tm 控制的 dbsession 的情况下,就故障模式而言,你能做的最好的事情。
推荐阅读
- c++ - 一个关于骰子概率计算的C++问题
- windows - bat 文件输出的表格表示
- netsuite - 如何使用 API 在 Netsuite 中触发脚本
- time - 将 @time 报告的 Julia 中大型向量的分配翻倍
- java - Storm 拓扑不在集群中执行
- database-design - 构建数据库关系以跟踪应用程序设置的不同变化
- ios - 支持 iOS 12 和 13 时的 AppDelegate 和 SceneDelegate
- javascript - 使用 ShinyJS 初始化隐藏元素
- java - 如何解决“buyTickets”类
- rust - 为什么使用 Option unwrap 不移动值?