首页 > 解决方案 > SQLAlchemy 会话在 celery 作业和 on_success 函数中清除

问题描述

我正在构建一个工具,它可以从不同的数据库中获取数据,对其进行转换,并将其存储在我自己的数据库中。我正在从 APScheduler 迁移到 Celery,但遇到了以下问题:

当作业运行时,我使用调用的类JobRecords来存储作业是否成功以及遇到了哪些错误。我用它来知道不要回头看太远的更新条目,特别是因为有些表有数百万行。

由于系统对于所有工作都是相同的,因此我从 celeryTask对象创建了一个子类。我确保作业在 Flask 应用程序上下文中执行,并获取此作业成功完成的最新时间。我还确保我注册了一个值,now以避免查询数据库和添加作业记录之间的时间问题。

class RecordedTask(Task):
  """
  Task sublass that uses JobRecords to get the last run date
  and add new JobRecords on completion
  """
  now: datetime = None
  ignore_result = True

  _session: scoped_session = None
  success: bool = True
  info: dict = None

  @property
  def session(self) -> Session:
    """Making sure we have one global session instance"""
    if self._session is None:
      from app.extensions import db
      self._session = db.session
    return self._session

  def __call__(self, *args, **kwargs):
    from app.models import JobRecord

    kwargs['last_run'] = (
        self.session.query(func.max(JobRecord.run_at_))
        .filter(JobRecord.job_id == self.name, JobRecord.success)
        .first()
    )[0] or datetime.min
    self.now = kwargs['now'] = datetime.utcnow()

    with app.app_context():
      super(RecordedTask, self).__call__(*args, **kwargs)

  def on_failure(self, exc, task_id, args: list, kwargs: dict, einfo):
    self.session.rollback()
    self.success = False
    self.info = dict(
        args=args,
        kwargs=kwargs,
        error=exc.args,
        exc=format_exception(exc.__class__, exc, exc.__traceback__),
    )
    app.logger.error(f"Error executing job '{self.name}': {exc}")

  def on_success(self, retval, task_id, args: list, kwargs: dict):
    app.logger.info(f"Executed job '{self.name}' successfully, adding JobRecord")

    for entry in self.to_trigger:
      if len(entry) == 2:
        job, kwargs = entry
      else:
        job, = entry
        kwargs = {}
      app.logger.info(f"Scheduling job '{job}'")
      current_celery_app.signature(job, **kwargs).delay()

  def after_return(self, *args, **kwargs):
    from app.models import JobRecord
    record = JobRecord(
        job_id=self.name,
        run_at_=self.now,
        info=self.info,
        success=self.success
    )
    self.session.add(record)
    self.session.commit()
    self.session.remove()

我添加了一个工作示例来更新一个名为 的模型Location,但是有很多工作就像这个一样。

@celery.task(bind=True, name="update_locations")
def update_locations(self, last_run: datetime = datetime.min, **_):
  """Get the locations from the external database and check for updates"""
  locations: List[ExternalLocation] = ExternalLocation.query.filter(
      ExternalLocation.updated_at_ >= last_run
  ).order_by(ExternalLocation.id).all()

  app.logger.info(f"ExternalLocation: collected {len(locations)} updated locations")
  for update_location in locations:
    existing_location: Location = Location.query.filter(
        Location.external_id == update_location.id
    ).first()

    if existing_location is None:
      self.session.add(Location.from_worker(update_location))
    else:
      existing_location.update_from_worker(update_location)

问题是当我运行这个作业时,Location对象没有被提交JobRecord,所以只创建了后者。如果我用调试器跟踪它,Location.query.count()在函数内返回正确的值,但是一旦它进入on_success回调,它就会回到 0,并self._session.new返回一个空字典。

我已经尝试将会话添加为属性以确保它在任何地方都是相同的实例,但问题仍然存在。也许这与它scoped_session有关Flask-SQLAlchemy

抱歉,代码量很大,我确实尝试尽可能多地剥离。欢迎任何帮助!

标签: pythonflasksqlalchemyceleryflask-sqlalchemy

解决方案


我发现罪魁祸首是scoped_sessionFlask 应用程序上下文的组合。像任何上下文管理器一样,运行代码会在离开时with app.app_context()触发__exit__函数,这反过来会导致存储ScopedRegistryscoped_session被清除。然后,创建了一个新会话,将JobRecords其添加到该会话中,并提交了该会话。因此,这些位置不会被写入数据库。

有两种可能的解决方案。如果您不在任务中的其他文件中使用会话,则可以将会话属性添加到任务中。这样,您就可以完全避免,并且可以在您的函数scoped_session中进行清理。after_return

 @property 
 def session(self):
   if self._session is None:
      from dashboard.extensions import db
      self._session = db.create_session(options={})()
    return self._session

但是,我也在我的模型定义文件中访问会话,通过from extensions import db. 因此,我使用了两个不同的会话。我最终使用app.app_context().push()而不是上下文管理器,从而避免了该__exit__功能

  app.app_context().push()
  super(RecordedTask, self).__call__(*args, **kwargs)

推荐阅读