首页 > 解决方案 > Flask-Executor 和 Flask-SQLAlchemy:无法从执行器函数内部的数据库中获取更新的数据

问题描述

我正在向我的 Web 应用程序添加一个基于 Flask 的 API 来控制一些网络自动化功能的启动和停止。我遇到了一种奇怪的行为,Flask-Executor.submit()方法调用的函数似乎无法从数据库中获取新的或更新的数据。

我知道这个问题非常复杂,所以感谢任何分享他们的时间和意见的人。有关我的项目结构的概述,请参阅此问题的结尾。

烧瓶执行器文档说:

当调用submit()map()Flask-Executor 将使用当前应用程序上下文和当前请求上下文的副本包装 ThreadPoolExecutor 可调用对象

我不太完全理解上下文的含义,但我觉得这可能是一个很好的暗示,说明为什么这应该或不应该起作用。(顺便说一下,我正在使用 ThreadPoolExecutor)。我假设dbSQLAlchemy 对象是应用程序上下文的一部分,因此db应该在 executor 函数中提供一个副本。情况似乎并非如此,因为我仍然必须import db在包含执行程序调用的函数的文件中,正如您将在本文后面看到的那样。

我的前端有简单的开始和停止按钮,它们将 POST 发送到以下 API 路由:

文件:app/api.py

from flask import request
from flask_login import login_required
from app import app, db, executor
from app.models import Project
from datetime import datetime
from automation.Staging import control

@app.route('/api/staging/control', methods=['POST'])
@login_required
def staging_control():
    data = request.json
    project_id = data['project-id']
    action = data['staging-control']

    project = Project.query.get(project_id)
    sp = project.staging_profile
    current_status = sp.status

    if action == 'start':
        if current_status == 'STARTED':
            return {'response': 200, 'message': 'Job already running!'}
        else:
            sp.status = 'STARTED'
            db.session.commit()
            # The executor only spawns the thread if the task status was not already started.
            executor.submit(control.start_staging, project_id)
        
    
    elif action == 'stop':
        if current_status == 'STARTED':
            sp.status = 'STOPPED'
            db.session.commit()

    return {'response' : 200, 'message': 'OK'}

背景

作业的状态存储在数据库模型中。如果发布了start操作,则更新数据库模型的状态列。同样,如果发布了stop操作,则更新数据库模型的状态。

执行器的函数调用control.start_staging生成一个线程,该线程开始一个无限循环,该循环执行一些工作,然后休眠 X 秒。在每次循环开始时,我都会尝试检查数据库模型的状态列,以确定是否要从循环中中断并关闭线程。

启动线程工作得很好。数据库模型得到更新,执行器产生线程,我的 while 循环开始。

从我的前端发送stop动作也可以。数据库中的状态设置为STOPPED,我可以通过手动查询在我的数据库外壳中看到这一点。

但是,control.start_staging最初由 executor 启动的函数仍然认为status设置为STARTED,即使它实际上会STOPPED在线程运行期间的某个时间更新为 。我试图从线程内部尽可能多地获取更新的值。我见过这个类似的问题

这是control.start_staging功能。我在下面的摘录中分享了一些我尝试获取更新状态的不同方法作为评论:

文件:自动化/暂存/control.py

from app import db
from app.models import Project, Staging_Profile
from app.config import STAGING_DURATION_MINS
from datetime import datetime, timedelta
from time import sleep


def start_staging(project_id):    
    project = Project.query.get(project_id)
    print(f"Received START for project {project.project_name}")
    sp = project.staging_profile
    sp.last_activity = datetime.utcnow()
    db.session.commit()
    status = sp.status

    # Staging Loop Start
    while True:

        # This just serves as a force-stop if the job runs for more than STAGING_DURATION_MINUTES minutes.
        if sp.last_activity + timedelta(minutes=STAGING_DURATION_MINS) > datetime.utcnow():
            print(f"Status is: {sp.status}")

            # ATTEMPT 1: does not get updated data
            # status = sp.status

            # ATTEMPT 2: does not get updated data
            # status = Staging_Profile.query.get(project.staging_profile_id).status

            # ATTEMPT 3: does not get updated data
            all_profiles = db.session.query(Staging_Profile).all()
            this_profile = [profile for profile in all_profiles if profile.id == sp.id][0]

            if this_profile.status == 'STOPPED':
                print("Status is STOPPED. Returning")
                break
            
            else:
                print(f"Status is {this_profile.status}")

            # Do work
            do_some_stuff()

        else:
            break
        sleep(5)

    return

现在,真正令人费解的是,我可以从 executor 函数内部将数据写入数据库。sp.last_activity = datetime.utcnow()后面的行db.session.commit()成功写入线程启动的当前时间。

我的怀疑

我以非常模块化的方式构建了这个应用程序,我觉得这可能是问题的根源。

以下是我的应用程序结构的相关部分的概述:

app/
├─ __init__.py   # This is where my db & executor are instantiated
├─ api.py        # This is where the /api/staging/control route lives
├─ models.py     # This holds my SQLAlchemy DB classes
├─ routes.py     # This holds my regular front-end routes
├─ config.py     # General config parameters

automation/
├─ Staging/
│  ├─ control.py    # This is where the function passed to the executor is defined
│  ├─ __init__.py   # Empty
├─ __init__.py      # Empty

再次感谢。当我找到一个解决方案或解决方法时,我将发布此问题。

标签: pythonflasksqlalchemyflask-sqlalchemyconcurrent.futures

解决方案


推荐阅读