python - 如何在不中断正在运行的进程的情况下异步处理不同的(通过前端请求的)任务?
问题描述
我想实现以下设置,可能使用 Python/FastAPI 作为后端和 JavaScript 作为前端。到目前为止,我已经使用 fast-api/socketio 进行双向通信(在前端和后端之间)和 asyncio 用于协程的异步处理,但到目前为止我还没有找到令人满意的解决方案。
有人对以下问题有想法吗?使用前端,用户应该能够发送重新训练机器学习模型的请求。在进行再训练时,用户应该仍然能够例如从“旧”模型中获得预测,或者通过前端请求数据来可视化数据。再训练过程不应中断(时间过长),完成后应通知用户“新”模型已准备就绪。
请在下面找到我的方法,用于异步运行的两个任务,而不使用下面的 await asyncio.sleep()。
问题:(如何)这种带有 asyncio.gather 的方法将用于“持续检查 prediction_requests”的任务,并进一步返回预测结果?
from fastapi import FastAPI
from sklearn.neural_network import MLPClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from fastapi_socketio import SocketManager, socket_manager
import numpy as np
import pandas as pd
import asyncio
from datetime import date
app = FastAPI()
@app.get("/")
async def main():
await asyncio.gather(
asyncio.to_thread(trainModel),
printNumber()
)
def trainModel():
print('train model startet')
data = pd.read_csv('spam_data.csv')
# Text Preprocessing
import re # regex library
print('before preprocessor')
def preprocessor(text):
text = re.sub('<[^>]*>', '', text) # Effectively removes HTML markup tags
emoticons = re.findall('(?::|;|=)(?:-)?(?:\)|\(|D|P)', text)
text = re.sub('[\W]+', ' ', text.lower()) + ' '.join(emoticons).replace('-', '')
return text
print('after preprocessor')
# Train, Test Split
print('start train, test split')
from sklearn.model_selection import train_test_split
X = data['Message'].apply(preprocessor)
y = data['Category']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# Training a Neural Network Pipeline
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score, classification_report
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import cross_val_score
print('start training a NN Pipeline')
tfidf = TfidfVectorizer(strip_accents=None, lowercase=False,
max_features=700,
ngram_range=(1,1))
neural_net_pipeline = Pipeline([('vectorizer', tfidf),
('nn', MLPClassifier(hidden_layer_sizes=(700, 700)))])
neural_net_pipeline.fit(X_train, y_train)
print ('after fit')
model = neural_net_pipeline
print('model is trained')
return model
async def printNumber():
for i in range(100):
print(i)
其他方法依赖于使用 asyncio.sleep(),但我不想中断 trainModel() 进程并且没有 .sleep 和 .gather 两种方法一个接一个地运行。
@app.get("/")
async def main():
task2 = asyncio.create_task(trainModel())
task1 = asyncio.create_task(printNumber())
model = await task2
await task1
解决方案
推荐阅读
- java - 在 MainActivity 中更改语言环境有效,但在 Fragment 中更改它会产生 Nullpointerexception
- excel - 使用 sub for excel 从网站提取 URL 的问题
- r - 使用 ls() 模式为子图创建图列表
- c# - 控制器没有调用 SignalR
- xamarin - 如何在没有后退按钮的 Xamarin Shell 中切换页面?
- javascript - SnowflakeUtil.generate(A_NUMBER) 返回字符串类型
- python - 使用 Python Flask 在网页上显示时间
- linux-kernel - Linux 中硬件 GPIO 看门狗的设备树问题
- checksum - 如何计算以太网帧中的 FCS 字段?
- c++ - 如何在 C++ 中使用构造函数初始化结构列表