首页 > 解决方案 > 如何在 Python Web 应用程序中为状态更改通知生成服务器发送的事件?

问题描述

我有一个用 CherryPy 编写的网络应用程序:用户上传一个文件,然后开始一些冗长的操作,经过几个阶段。我希望将这些阶段的通知推送到所有连接的客户端。但我不知道如何在进程之间进行通信。我想我将不得不在一个单独的进程中启动冗长的操作,但是我不知道如何将“高级到第 N 阶段”消息传递给“服务器发送功能”。

从概念上讲,它会是这样的:

SSEtest.py:

from pathlib import Path
from time import sleep
import cherrypy


def lengthy_operation(name, stream):
    for stage in range(10):
        print(f'stage {stage}... ', end='')
        sleep(2)
        print('done')
    print('finished')


class SSETest():

    @cherrypy.expose
    def index(self):
        return Path('SSEtest.html').read_text()

    @cherrypy.expose
    def upload(self, file):
        name = file.filename.encode('iso-8859-1').decode('utf-8')
        lengthy_operation(name, file.file)
        return 'OK'

    @cherrypy.expose
    def stage(self):
        cherrypy.response.headers['Content-Type'] = 'text/event-stream;charset=utf-8'

        def lengthy_operation():
            for stage in range(5):
                yield f'data: stage {stage}... \n\n'
                sleep(2)
                yield 'data: done\n\n'
            yield 'data: finished\n\n'

        return lengthy_operation()

    stage._cp_config = {'response.stream': True, 'tools.encode.encoding': 'utf-8'}


cherrypy.quickstart(SSETest())

SSEtest.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <title>SSE Test</title>
</head>
<body>
<h1>SSE Test</h1>
<div>
    <form id="load_file_form" action="" enctype="multipart/form-data">
        <label for="load_file">Load a file: </label>
        <input type="file" id="load_file" name="load_file">
        <progress max="100" value="0" id="progress_bar"></progress>
    </form>
</div>

<div id="status_messages">
<h3>Stages:</h3>
</div>

<script>
    const load_file = document.getElementById('load_file');
    const progress_bar = document.getElementById('progress_bar');

    function update_progress_bar(event) {
        if (event.lengthComputable) {
            progress_bar.value = Math.round((event.loaded/event.total)*100);
        }
    }

    load_file.onchange = function (event) {
        let the_file = load_file.files[0];
        let formData = new FormData();
        let connection = new XMLHttpRequest();

        formData.append('file', the_file, the_file.name);

        connection.open('POST', 'upload', true);
        connection.upload.onprogress = update_progress_bar;
        connection.onload = function (event) {
            if (connection.status != 200) {
                alert('Error! ' + event);
            }
        };

        connection.send(formData);
    };

    const status_messages = document.getElementById("status_messages");
    const sse = new EventSource("stage");

    sse.onopen = function (event) {
        let new_message = document.createElement("p");
        new_message.innerHTML = "Connection established: " + event.type;
        status_messages.appendChild(new_message);
    };

    sse.onmessage = function (event) {
        let new_message = document.createElement("p");
        new_message.innerHTML = event.data;
        status_messages.appendChild(new_message);
    };

    sse.onerror = function(event) {
        let new_message = document.createElement("p");
        if (event.readyState == EventSource.CLOSED) {
            new_message.innerHTML = "Connections closed";
        } else {
            new_message.innerHTML = "Error: " + event.type;
        }
        status_messages.appendChild(new_message);
    };

</script>
</body>
</html>

当文件上传时,我只需要lengthy_operation()被调用一次。并将其生成的消息发送给所有客户端。现在它可以与本地功能一起使用,这不是我想要的。如何使用外部函数并将其消息传递给stage()方法?

标签: pythoncherrypyserver-sent-events

解决方案


我希望将这些阶段的通知推送到所有连接的客户端。

我怀疑最终你会想要更多的控制权,但我会按照你的要求回答你的问题。稍后,您可能希望在以下示例的基础上构建并根据用户的会话、或基于某个起始时间戳或其他一些相关概念来过滤广播通知。

每个“连接的客户端”实际上都挂在一个长时间运行的请求上/stage,服务器将使用该请求将事件流式传输到客户端。在您的示例中,每个客户端将立即开始该请求并使其保持打开状态,直到服务器终止流。您还可以close()使用EventSource.

基本解决方案

您询问了如何让/stage处理程序将其事件广播或镜像到所有当前连接的客户端。有很多方法可以实现这一点,但简而言之,您希望lengthy_operation函数将事件发布到所有/stage处理程序读取器或发送到所有处理程序读取的持久共享位置/stage。我将展示一种封装上述第一个想法的方法。

考虑一个序列化为的通用流事件类data: <some message>

class StreamEvent:
    def __init__(self, message: str) -> bytes:
        self.message = message

    def serialize(self) -> str:
        return f'data: {self.message}\n\n'.encode('utf-8')

以及文件相关流事件的更具体派生案例:

class FileStreamEvent(StreamEvent):
    def __init__(self, message: str, name: str):
        super().__init__(message)
        self.name = name

    def serialize(self) -> bytes:
        return f'data: file: {self.name}: {self.message}\n\n'.encode('utf-8')

您可以创建一个极其原始的发布/订阅类型的容器,/stage然后可以在其中订阅侦听器并将实例lengthy_operation()发布到所有侦听器:StreamEvent

class StreamSource:
    def __init__(self):
        self.listeners: List[Queue] = []

    def put(self, event: StreamEvent):
        for listener in self.listeners:
            listener.put_nowait(event)

    def get(self):
        listener = Queue()
        self.listeners.append(listener)
        try:
            while True:
                event = listener.get()
                yield event.serialize()
        finally:
            self.listeners.remove(listener)

StreamSource.get()中,您可能希望创建一个结束情况(例如检查“关闭”或“完成”事件)以退出泛型while True,并且您可能希望在阻塞Queue.get()调用上设置超时。但是为了这个例子,我保持一切基本。

现在,lengthy_operation()只需要引用 a StreamSource

def lengthy_operation(events: StreamSource, name: str, stream: BinaryIO):
    for stage in range(10):
        events.put(FileStreamEvent(f'stage {stage}: begin', name))
        sleep(2)
        events.put(FileStreamEvent(f'stage {stage}: end', name))
    events.put(FileStreamEvent('finished', name))

SSETestStreamSource然后可以为每个lengthy_operation()调用提供一个共享实例,并SSETest.stage()可以用来StreamSource.get()在这个共享实例上注册一个监听器:

class SSETest:
    _stream_source: StreamSource = StreamSource()

    @cherrypy.expose
    def index(self):
        return Path('SSETest.html').read_text()

    @cherrypy.expose
    def upload(self, file):
        name = file.filename.encode('iso-8859-1').decode('utf-8')
        lengthy_operation(self._stream_source, name, file.file)
        return 'OK'

    @cherrypy.expose
    def stage(self):
        cherrypy.response.headers['Cache-Control'] = 'no-cache'
        cherrypy.response.headers['Content-Type'] = 'text/event-stream'
        def stream():
            yield from self._stream_source.get()
        return stream()

    stage._cp_config = {'response.stream': True}

这是一个完整的[1]示例,说明了如何解决您的直接问题,但您很可能希望在您更接近您可能想到的最终用户体验时对其进行调整。

[1]:为了便于阅读,我省略了导入,所以它们是:

from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from time import sleep
from typing import BinaryIO, List
import cherrypy

后续退出条件

由于您使用的是cherrypy.quickstart(),在上面的最小可行解决方案中,您将不得不强制退出SSETest服务,因为我没有为您假设任何优雅的“停止”行为。第一个解决方案明确指出了这一点,但为了便于阅读,没有提供解决方案。

让我们看一下提供一些初始优雅“停止”条件的几种方法:

添加停止条件StreamSource

首先,至少给StreamSource. 例如,添加一个running允许StreamSource.get() while循环优雅退出的属性。接下来,设置合理的Queue.get()超时时间,以便循环可以running在处理消息之间定期测试此属性。接下来,确保至少有一些相关的 CherryPy 总线消息触发此停止行为。下面,我已将所有这些行为纳入StreamSource类,但您也可以注册一个单独的应用程序级 CherryPy 插件来处理调用StreamSource.stop()而不是制作StreamSource插件。当我添加一个单独的信号处理程序时,我将演示它的样子。

class StreamSource(plugins.SimplePlugin):
    def __init__(self, bus: wspbus.Bus):
        super().__init__(bus)
        self.subscribe()
        self.running = True
        self.listeners: List[Queue] = []

    def graceful(self):
        self.stop()

    def exit(self):
        self.stop()

    def stop(self):
        self.running = False

    def put(self, event: StreamEvent):
        for listener in self.listeners:
            listener.put_nowait(event)

    def get(self):
        listener = Queue()
        self.listeners.append(listener)
        try:
            while self.running:
                try:
                    event = listener.get(timeout=1.0)
                    yield event.serialize()
                except Empty:
                    pass
        finally:
            self.listeners.remove(listener)

现在,SSETest将需要StreamSource使用总线值进行初始化,因为该类现在是SimplePlugin

    _stream_source: StreamSource = StreamSource(cherrypy.engine)

你会发现这个解决方案让你在用户体验方面更接近你可能想要的。发出一个键盘中断,CherryPy 将开始停止系统,但是第一个优雅的键盘中断不会发布stop消息,因为你需要发送第二个键盘中断。

添加 SIGINT 处理程序以捕获键盘中断

由于cherrypy.quickstart使用信号处理程序的方式,您可能希望将SIGINT处理程序注册为与 CherryPy 兼容的插件,以便在第一次键盘中断时SignalHandler优雅地停止。StreamSource

这是一个例子:

class SignalHandler(plugins.SignalHandler):
    def __init__(self, bus: wspbus.Bus, sse):
        super().__init__(bus)
        self.handlers = {
            'SIGINT': self.handle_SIGINT,
        }
        self.sse = sse

    def handle_SIGINT(self):
        self.sse.stop()
        raise KeyboardInterrupt()

请注意,在这种情况下,我将演示一个通用应用程序级处理程序,然后您可以通过更改启动cherrypy.quickstart()逻辑来配置和初始化,如下所示:

sse = SSETest()
SignalHandler(cherrypy.engine, sse).subscribe()
cherrypy.quickstart(sse)

对于此示例,我公开了一个通用应用程序SSETest.stop方法来封装所需的行为:

class SSETest:
    _stream_source: StreamSource = StreamSource(cherrypy.engine)

    def stop(self):
        self._stream_source.stop()

总结分析

不是CherryPy 用户,我昨天才第一次开始查看它只是为了回答您的问题,所以我将把“CherryPy 最佳实践”留给您自行决定。

实际上,您的问题是以下 Python 问题的非常通用的组合:

  1. 如何实现简单的发布/订阅模式?(用 回答Queue);
  2. 如何为订阅者循环创建退出条件?Queue.get()(用'stimeout参数和running属性回答)
  3. 如何通过键盘中断影响退出条件?(用 CherryPy 特定的信号处理程序回答,但这只是基于您在 Python 的内置signal模块中可以找到的概念)

您可以通过多种方式解决所有这些问题,其中一些更倾向于通用的“Pythonic”解决方案(我更喜欢它是有意义的),而另一些则利用以 CherryPy 为中心的概念(这在您想要增强 CherryPy 行为而不是而不是重写或破坏它)。

例如,您可以使用 CherryPy 总线消息来传达流消息,但对我来说,这会使您的应用程序逻辑在 CherryPy 特定功能中过多地纠缠,所以我可能会找到一个中间地带,您可以通用地处理您的应用程序功能(所以如我的StreamSource示例如何使用标准 PythonQueue模式所示,不要将自己与 CherryPy 联系在一起。您可以选择制作StreamSource一个插件,以便它可以直接响应某些 CherryPy 总线消息(如上所示),或者您可以拥有一个知道调用相关应用程序特定域的单独插件,例如StreamSource.stop()(类似于我用SignalHandler) 显示。

最后,你所有的问题都很好,但它们之前都可能在 SO 上作为通用 Python 问题得到了回答,所以当我在这里将答案与你的 CherryPy 问题空间联系起来时,我也想帮助你(和未来的读者)意识到如何在 CherryPy 之外更抽象地思考这些特殊问题。


推荐阅读