python - Flask-Executor 和 Flask-SQLAlchemy:无法从执行器函数内部的数据库中获取更新的数据
问题描述
我正在向我的 Web 应用程序添加一个基于 Flask 的 API 来控制一些网络自动化功能的启动和停止。我遇到了一种奇怪的行为,Flask-Executor.submit()
方法调用的函数似乎无法从数据库中获取新的或更新的数据。
我知道这个问题非常复杂,所以感谢任何分享他们的时间和意见的人。有关我的项目结构的概述,请参阅此问题的结尾。
烧瓶执行器文档说:
当调用
submit()
或map()
Flask-Executor 将使用当前应用程序上下文和当前请求上下文的副本包装 ThreadPoolExecutor 可调用对象
我不太完全理解上下文的含义,但我觉得这可能是一个很好的暗示,说明为什么这应该或不应该起作用。(顺便说一下,我正在使用 ThreadPoolExecutor)。我假设db
SQLAlchemy 对象是应用程序上下文的一部分,因此db
应该在 executor 函数中提供一个副本。情况似乎并非如此,因为我仍然必须import db
在包含执行程序调用的函数的文件中,正如您将在本文后面看到的那样。
我的前端有简单的开始和停止按钮,它们将 POST 发送到以下 API 路由:
文件:app/api.py
from flask import request
from flask_login import login_required
from app import app, db, executor
from app.models import Project
from datetime import datetime
from automation.Staging import control
@app.route('/api/staging/control', methods=['POST'])
@login_required
def staging_control():
data = request.json
project_id = data['project-id']
action = data['staging-control']
project = Project.query.get(project_id)
sp = project.staging_profile
current_status = sp.status
if action == 'start':
if current_status == 'STARTED':
return {'response': 200, 'message': 'Job already running!'}
else:
sp.status = 'STARTED'
db.session.commit()
# The executor only spawns the thread if the task status was not already started.
executor.submit(control.start_staging, project_id)
elif action == 'stop':
if current_status == 'STARTED':
sp.status = 'STOPPED'
db.session.commit()
return {'response' : 200, 'message': 'OK'}
背景
作业的状态存储在数据库模型中。如果发布了start
操作,则更新数据库模型的状态列。同样,如果发布了stop
操作,则更新数据库模型的状态。
执行器的函数调用control.start_staging
生成一个线程,该线程开始一个无限循环,该循环执行一些工作,然后休眠 X 秒。在每次循环开始时,我都会尝试检查数据库模型的状态列,以确定是否要从循环中中断并关闭线程。
启动线程工作得很好。数据库模型得到更新,执行器产生线程,我的 while 循环开始。
从我的前端发送stop
动作也可以。数据库中的状态设置为STOPPED
,我可以通过手动查询在我的数据库外壳中看到这一点。
但是,control.start_staging
最初由 executor 启动的函数仍然认为status
设置为STARTED
,即使它实际上会STOPPED
在线程运行期间的某个时间更新为 。我试图从线程内部尽可能多地获取更新的值。我见过这个类似的问题。
这是control.start_staging
功能。我在下面的摘录中分享了一些我尝试获取更新状态的不同方法作为评论:
文件:自动化/暂存/control.py
from app import db
from app.models import Project, Staging_Profile
from app.config import STAGING_DURATION_MINS
from datetime import datetime, timedelta
from time import sleep
def start_staging(project_id):
project = Project.query.get(project_id)
print(f"Received START for project {project.project_name}")
sp = project.staging_profile
sp.last_activity = datetime.utcnow()
db.session.commit()
status = sp.status
# Staging Loop Start
while True:
# This just serves as a force-stop if the job runs for more than STAGING_DURATION_MINUTES minutes.
if sp.last_activity + timedelta(minutes=STAGING_DURATION_MINS) > datetime.utcnow():
print(f"Status is: {sp.status}")
# ATTEMPT 1: does not get updated data
# status = sp.status
# ATTEMPT 2: does not get updated data
# status = Staging_Profile.query.get(project.staging_profile_id).status
# ATTEMPT 3: does not get updated data
all_profiles = db.session.query(Staging_Profile).all()
this_profile = [profile for profile in all_profiles if profile.id == sp.id][0]
if this_profile.status == 'STOPPED':
print("Status is STOPPED. Returning")
break
else:
print(f"Status is {this_profile.status}")
# Do work
do_some_stuff()
else:
break
sleep(5)
return
现在,真正令人费解的是,我可以从 executor 函数内部将数据写入数据库。sp.last_activity = datetime.utcnow()
后面的行db.session.commit()
成功写入线程启动的当前时间。
我的怀疑
我以非常模块化的方式构建了这个应用程序,我觉得这可能是问题的根源。
以下是我的应用程序结构的相关部分的概述:
app/
├─ __init__.py # This is where my db & executor are instantiated
├─ api.py # This is where the /api/staging/control route lives
├─ models.py # This holds my SQLAlchemy DB classes
├─ routes.py # This holds my regular front-end routes
├─ config.py # General config parameters
automation/
├─ Staging/
│ ├─ control.py # This is where the function passed to the executor is defined
│ ├─ __init__.py # Empty
├─ __init__.py # Empty
再次感谢。当我找到一个解决方案或解决方法时,我将发布此问题。
解决方案
推荐阅读
- algorithm - 拓扑排序卡恩算法 BFS 或 DFS
- javascript - 在 React 中覆盖元素类型
- python - django 在表中获取不同的记录似乎选择了随机行
- firebase-hosting - Firebase 这是设置缓存的正确方法
- python - Python subprocess.Popen() 找不到可执行文件
- python - Matplotlib,使用用户输入在循环中绘制什么
- c# - 如何通过移位将字节插入数组?
- sql - 如何计算低于 SQL 中第 10 个百分位的行数(及其百分比)?
- aws-amplify - 有没有办法在 Amplify 中的 lambda 函数之间共享环境变量和秘密?
- informix - 更改数据库 Informix 上的用户权限