首页 > 解决方案 > Flask 将 EventStreams 发送到 HTML 和 JS 获取事件消息

问题描述

我已经构建了一个代码,将 GEO 数据发送到 Kafka 系统,然后在 Flask 应用程序中被拾取并流式传输到单独的页面中。我希望每秒更新一次的实时流被javascript拾取并在地图上放置地理点。当我运行脚本时,它似乎识别了事件源,但是当我想打印出实时数据时,它没有找到任何实时数据。

var mymap = L.map('mapid').setView([51.505, -0.09], 13);
    
L.tileLayer('https://api.mapbox.com/styles/v1/{id}/tiles/{z}/{x}/{y}?access_token={accessToken}', {
    attribution: 'Map data &copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors, Imagery © <a href="https://www.mapbox.com/">Mapbox</a>',
    maxZoom: 18,
    id: 'mapbox/streets-v11',
    tileSize: 512,
    zoomOffset: -1,
    accessToken: '[redacted]'
}).addTo(mymap);
    
var source = new EventSource('/topic/London');
source.addEventListener('message', function(e) {
    var obj = JSON.parse(e.data);
    console.log(obj);
}, false);

这是每秒流式传输的数据的屏幕截图: 在此处输入图像描述

但它没有被 JavaScript 返回到控制台 在此处输入图像描述

我试图通过添加一些额外的代码来查看正在触发的事件来调试代码。当代码运行时,它向我显示连接是“打开”并且 EventState.readyState 是 1 表示它的打开:

ssEvent = new EventSource( '/topic/London' );
ssEvent.onopen = function (evt) {
  console.log('Open');
}
ssEvent.onerror = function (evt) {
  console.log('Error');
}
ssEvent.onmessage = function (evt) {
  console.log('Message');
}

在此处输入图像描述

这是我的 Flask 应用程序代码:

  from flask import Flask
  from flask import render_template
  from flask import Response
  from pykafka import KafkaClient

  app = Flask(__name__)

  def kafkaClient():
      return KafkaClient(hosts = 'localhost:9092')

  @app.route('/')
  def index():
      return render_template('index.html')


  @app.route('/topic/<topicname>')
    def get_message(topicname):
      client = kafkaClient()
      def events():
          for i in client.topics[topicname].get_simple_consumer():
              yield 'data{0}\n\n'.format(i.value.decode())
      return Response(events(), mimetype= 'text/event-stream')

  if __name__ == '__main__':
    app.run(debug = True, port = 5001)

标签: javascriptjqueryflask

解决方案


我通过进入 App.py 文件并添加到使用 yield 的函数并添加了我想要流式传输的事件名称和数据来解决了这个问题。

更新的 Flask 应用程序如下所示:

 @app.route('/topic/<topicname>', methods=['GET'])
def get_message(topicname):
    client = kafkaClient()
    def events():
        for i in client.topics[topicname].get_simple_consumer():
            yield "event: {0}\ndata: {1}\n\n".format("listen", i.value.decode())
    return Response(events(), mimetype= 'text/event-stream')

推荐阅读