首页 > 解决方案 > 如何将 RxPy 数据流发送到前端 javascript

问题描述

我正在尝试将 python ReactiveX 流(使用 RxPy 库)发送到 Web UI 组件上的 javascript,但我似乎找不到这样做的方法。此外,我可能需要将进入 Javascript 的数据流放入各种 RxJS Observable 中以进行进一步处理。你能帮我理解如何实现这一目标吗?我仍然掌握 ReactiveX,所以也许我缺少一些基本概念,但我正在努力在网上找到类似的东西。

这个问题出现在我正在开发一个桌面应用程序时,该应用程序从 csv 或 zeromq 端点获取数据,并将其流式传输到将动态绘制数据的 UI(随着新数据的到来更新绘图)。我正在使用 Electron 构建我的应用程序,使用 python 作为我的后端代码。Python 是必须的,因为我将使用一些 TensorFlow 模型扩展应用程序。

多年来,作为初始结构的示例非常好,我编写了一些示例代码来玩,但我似乎无法让它工作。我设法从 UI 按钮一直到 python 脚本,但我陷入了 PriceApi.get_stream(...) 方法的返回。

索引.html

前端是笔直的。

<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8">
        <title>Electron Application</title>  
    </head>
    <body>
        <button id="super-button">Trigger Python Code</button>
        <div id="py-output">
        </div>
    </body>
    <script src="renderer.js" ></script>
</html>

api.py:

ZeroRPC 服务器文件与上述链接中的文件类似。

import gevent
import json
import signal
import zerorpc
from core_operator import stream


class PricesApi(object):

    def get_stream(self, filename):
        return stream(filename)

    def stop(self):
        print('Stopping strategy.')

    def echo(self, text):
        """echo any text"""
        return text


def load_settings():
    with open('settings.json') as json_settings:
        settings_dictionary = json.load(json_settings)
    return settings_dictionary


def main():
    settings = load_settings()
    s = zerorpc.Server(PricesApi())
    s.bind(settings['address'])
    print(f"Initialising server on {settings['address']}")
    s.run()


if __name__ == '__main__':
    main()

core_operator.py

这是主要逻辑将从 zeroMQ 订阅获取价格的文件,但目前只是从 csv 创建一个 Observable。

import sys
import rx
from csv import DictReader


def prepare_csv_timeseries_stream(filename):
    return rx.from_(DictReader(open(filename, 'r')))


def stream(filename):
    price_observable = prepare_csv_timeseries_stream(filename)
    return price_observable

渲染.js

最后,应该接收流的 javascript:

const zerorpc = require('zerorpc');
const fs = require('fs')

const settings_block = JSON.parse(fs.readFileSync('./settings.json').toString());
let client = new zerorpc.Client();
client.connect(settings_block['address']);

let button = document.querySelector('#super-button');
let pyOutput = document.querySelector('#py-output');
let filename = '%path-to-file%'
button.addEventListener('click', () => {
    let line_to_write = '1'
    console.log('button click received.')
    client.invoke('get_stream', filename, (error, result) => {
        var messages = pyOutput;
        message = document.createElement('li'),
        content = document.createTextNode(error.data);
        message.appendChild(content);
        messages.appendChild(message);

        if(error) {
            console.error(error);
        } else {
           var messages = pyOutput;
           message = document.createElement('li'),
           content = document.createTextNode(result.data);
           message.appendChild(content);
           messages.appendChild(message);    
        }
    })
})

我一直在研究使用 WebSockets,但未能理解如何实现它。我确实找到了一些使用 Tornado 服务器的示例,但是我试图使其尽可能纯净,而且,我已经有了来自 Electron 的客户端/服务器结构,我无法直接使用它,这感觉很奇怪。此外,我正在尝试将整个系统维护为 PUSH 结构,因为数据要求不允许 PULL 类型的模式,以及定期轮询等。

非常感谢您随时可以为此付出代价,如果您需要任何进一步的详细信息或解释,请告诉我。

标签: javascriptpythonelectronrx-py

解决方案


我通过使用一个名为 Eel 的惊人库(描述为“用于制作简单的类似 Electron 的 HTML/JS GUI 应用程序的小 Python 库”)找到了一个解决方案。它的绝对简单性和直观性使我能够通过几条简单的线条来实现我想要的。

  1. 按照介绍了解布局。
  2. 然后你的主要python文件(我方便地命名为main.py),你将流函数暴露给eel,所以它可以从JS文件中调用,并将流通过管道传递给从JS文件中暴露的JavaScript“receive_price”函数!
import sys
import rx
from csv import DictReader


def prepare_csv_timeseries_stream(filename):
    return rx.from_(DictReader(open(filename, 'r')))


def process_logic():
    return pipe(
        ops.map(lambda p: print(p)),  # just to view what's flowing through
        ops.map(lambda p: eel.receive_price(p)),  # KEY FUNCTION in JS file, exposed via eel, is called for each price. 
    )


@eel.expose  # Decorator so this function can get triggered from JavaScript
def stream(filename):
    price_observable = prepare_csv_timeseries_stream(filename)
    price_observable.pipe(process_logic()).subscribe()  # apply the pipe and subscribe to trigger stream

eel.init('web')
eel.start('main.html')  # look at how beautiful and elegant this is! 
  1. 现在我们创建 price_processing.js 文件(按照 Eel 说明放置在“web”文件夹中)以合并公开的函数
let button   = document.querySelector('#super-button');
let pyOutput = document.querySelector('#py-output'   );
let filename = '%path-to-file%'

console.log("ready to receive data!")

eel.expose(receive_price);  // Exposing the function to Python, to process each price
function receive_price(result) {
    var messages = pyOutput;
    message = document.createElement('li');
    content = document.createTextNode(result);
    message.appendChild(content);
    messages.appendChild(message);
    // in here you can add more functions to process data, e.g. logging, charting and so on..
};

button.addEventListener('click', () => {
    console.log('Button clicked magnificently! Bloody good job')
    eel.stream(filename); // calling the Python function exposed through Eel to start stream.
})
  1. 除了根据 Eel 文档和我们的 price_processing.js 文件更改脚本 refs: /eel.js 之外,HTML 几乎保持不变。
<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8">
        <title>Let's try Eel</title>
    </head>
    <body>
        <h1>Eel-saved-my-life: the App!</h1>
        <button id="super-button">Trigger Python Code</button>
        <div id="py-output">

        </div>
    </body>
    <script type="text/javascript" src="/eel.js"></script>
    <script type="text/javascript" src="price_processing.js"></script>
</html>

我希望这可以帮助任何遇到同样问题的人。


推荐阅读