首页 > 解决方案 > 如何让 Flask 网页(路由)在另一个网页(路由)上在后台运行

问题描述

所以我正在创建这个应用程序,其中一部分是一个网页,其中交易算法正在使用实时数据进行自我测试。一切正常,但问题是如果我离开(退出)网页,它就会停止。我想知道如何让它无限期地在后台运行,因为我希望算法继续这样做。

这是我想在后台运行的路线。

@app.route('/live-data-source')
def live_data_source():
    def get_live_data():
        live_options = lo.Options()
        while True:
            live_options.run()
            live_options.update_strategy()
            trades = live_options.get_all_option_trades()
            trades = trades[0]
            json_data = json.dumps(
                {'data': trades})
            yield f"data:{json_data}\n\n"
            time.sleep(5)

    return Response(get_live_data(), mimetype='text/event-stream')

我研究过多线程,但不太确定这是否适合这项工作。我对烧瓶还很陌生,所以这个问题很糟糕。如果您需要更多信息,请发表评论。

标签: pythonmultithreadingflaskwebsocket

解决方案


您可以通过以下方式进行 - 这是下面的 100% 工作示例。请注意,在生产中使用 Celery 来执行此类任务,或者自己编写另一个守护程序应用程序(另一个进程),并借助消息队列(例如 RabbitMQ)或借助通用数据库从 http 服务器向其提供任务。

如果对以下代码有任何疑问,请随时提出,这对我来说是一个很好的练习:

from flask import Flask, current_app
import threading
from threading import Thread, Event
import time
from random import randint

app = Flask(__name__)
# use the dict to store events to stop other treads
# one event per thread !
app.config["ThreadWorkerActive"] = dict()


def do_work(e: Event):
    """function just for another one thread to do some work"""
    while True:
        if e.is_set():
            break  # can be stopped from another trhead
        print(f"{threading.current_thread().getName()} working now ...")
        time.sleep(2)
    print(f"{threading.current_thread().getName()} was stoped ...")


@app.route("/long_thread", methods=["GET"])
def long_thread_task():
    """Allows to start a new thread"""
    th_name = f"Th-{randint(100000, 999999)}"  # not really unique actually
    stop_event = Event()  # is used to stop another thread
    th = Thread(target=do_work, args=(stop_event, ), name=th_name, daemon=True)
    th.start()
    current_app.config["ThreadWorkerActive"][th_name] = stop_event
    return f"{th_name} was created!"


@app.route("/stop_thread/<th_id>", methods=["GET"])
def stop_thread_task(th_id):
    th_name = f"Th-{th_id}"
    if th_name in current_app.config["ThreadWorkerActive"].keys():
        e = current_app.config["ThreadWorkerActive"].get(th_name)
        if e:
            e.set()
            current_app.config["ThreadWorkerActive"].pop(th_name)
            return f"Th-{th_id} was asked to stop"
        else:
            return "Sorry something went wrong..."
    else:
        return f"Th-{th_id} not found"


@app.route("/", methods=["GET"])
def index_route():
    text = ("/long_thread - create another thread.  "
            "/stop_thread/th_id - stop thread with a certain id.  "
            f"Available Threads: {'; '.join(current_app.config['ThreadWorkerActive'].keys())}")
    return text


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=9999)

推荐阅读