首页 > 解决方案 > 如何在不中断正在运行的进程的情况下异步处理不同的(通过前端请求的)任务?

问题描述

我想实现以下设置,可能使用 Python/FastAPI 作为后端和 JavaScript 作为前端。到目前为止,我已经使用 fast-api/socketio 进行双向通信(在前端和后端之间)和 asyncio 用于协程的异步处理,但到目前为止我还没有找到令人满意的解决方案。

有人对以下问题有想法吗?使用前端,用户应该能够发送重新训练机器学习模型的请求。在进行再训练时,用户应该仍然能够例如从“旧”模型中获得预测,或者通过前端请求数据来可视化数据。再训练过程不应中断(时间过长),完成后应通知用户“新”模型已准备就绪。

请在下面找到我的方法,用于异步运行的两个任务,而不使用下面的 await asyncio.sleep()。
问题:(如何)这种带有 asyncio.gather 的方法将用于“持续检查 prediction_requests”的任务,并进一步返回预测结果?

from fastapi import FastAPI
from sklearn.neural_network import MLPClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from fastapi_socketio import SocketManager, socket_manager
import numpy as np
import pandas as pd
import asyncio
from datetime import date

app = FastAPI()

@app.get("/")
async def main():
        
        await asyncio.gather(
        asyncio.to_thread(trainModel),
        printNumber()
        )
    

def trainModel():
    print('train model startet')
    data = pd.read_csv('spam_data.csv')

    # Text Preprocessing
    import re # regex library
    print('before preprocessor')
    def preprocessor(text):
        text = re.sub('<[^>]*>', '', text) # Effectively removes HTML markup tags
        emoticons = re.findall('(?::|;|=)(?:-)?(?:\)|\(|D|P)', text)
        text = re.sub('[\W]+', ' ', text.lower()) + ' '.join(emoticons).replace('-', '')
        return text
    print('after preprocessor')

    # Train, Test Split
    print('start train, test split')
    from sklearn.model_selection import train_test_split
    X = data['Message'].apply(preprocessor)
    y = data['Category']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    
    # Training a Neural Network Pipeline
    from sklearn.pipeline import Pipeline
    from sklearn.linear_model import LogisticRegression
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics import accuracy_score, classification_report
    from sklearn.neural_network import MLPClassifier
    from sklearn.model_selection import cross_val_score
    print('start training a NN Pipeline')
    
    tfidf = TfidfVectorizer(strip_accents=None, lowercase=False, 
                            max_features=700, 
                            ngram_range=(1,1))
    neural_net_pipeline = Pipeline([('vectorizer', tfidf), 
                                    ('nn', MLPClassifier(hidden_layer_sizes=(700, 700)))])

    neural_net_pipeline.fit(X_train, y_train)
    print ('after fit')
    model = neural_net_pipeline
    print('model is trained')
    return model 


async def printNumber():
    for i in range(100):
        print(i)

其他方法依赖于使用 asyncio.sleep(),但我不想中断 trainModel() 进程并且没有 .sleep 和 .gather 两种方法一个接一个地运行。

@app.get("/")
async def main():
    task2 = asyncio.create_task(trainModel())
    task1 = asyncio.create_task(printNumber())
    model = await task2
    await task1

标签: pythonsocket.iopython-asynciocoroutinefastapi

解决方案


推荐阅读