首页 > 解决方案 > 流式传输实时 Websocket 流数据并在类中使用破折号更新绘图图

问题描述

抱歉,如果这是一个愚蠢的问题,但我想知道是否可以创建一个破折号应用程序并将实时数据直接流式传输到图表中?

我一直在研究一个包含许多元素的交易机器人。我目前正在尝试从 websocket 流式传输每 2000 毫秒更新一次的数据,并将其直接馈送到图表中。

我对编程比较陌生,因此可以在必要时使用一些帮助和批评!

因此,由于这个实时图表最终将作为 GUI 和机器人功能的一部分合并,我试图让这个 priceTicker 类既包含初始化 dash 应用程序,又包含使用接收到的 websocket 流实时更新图表。

我能够毫无问题地流式传输数据,并且可以将数据解析为 json、csv 或其他任何东西。我还可以在仪表板应用程序中初始化图表,但它不会用新数据更新图表。

我已经查看了一般区域周围的一些 SO 帖子,但不幸的是没有找到任何可以解释我的情况的帖子 - 抱歉,如果有任何帖子我错过了!

以下是我的完整代码:

from binance.client import Client
from binance.websockets import BinanceSocketManager
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Output, Input
import plotly
import plotly.graph_objs as go
from collections import deque
import pandas as pd
import json

class priceTicker:

    def __init__(self, api_key, secret_key):
        self.app = dash.Dash(__name__)
        self.ticker_time = deque(maxlen=200)
        self.last_price = deque(maxlen=200)
        self.app.layout = html.Div(
            [
                dcc.Graph(id = 'live-graph', animate = True),
                dcc.Interval(
                    id = 'graph-update',
                    interval = 2000,
                    n_intervals = 0
                )
            ]
        )

        client = Client(api_key, secret_key)
        socket = BinanceSocketManager(client)
        socket.start_kline_socket('BTCUSDT', self.on_message, '1m')

        socket.start()
    def on_message(self, message):
        app = self.app
        price = {'time':message['E'], 'price':message['k']['c']}
        self.ticker_time.append(message['E'])
        self.last_price.append(message['k']['c'])
        ticker_time = self.ticker_time
        last_price = self.last_price
        #print(self.ticker_time, self.last_price)

        @app.callback(
            Output('live-graph', 'figure'),
            [Input('graph-update', 'n_intervals')])
        def update_graph(ticker_time,last_price):

            data = go.Scatter(
                x = list(ticker_time),
                y = list(last_price),
                name ='Scatter',
                mode = 'lines+markers'
            )


            return {'data': [data],
                    'layout': go.Layout(xaxis=dict(range=[min(ticker_time), max(ticker_time)]),
                    yaxis=dict(range=[min(last_price), max(last_price)]),)}

def Main():
    api_key = ''
    secret_key = ''
    ticker = priceTicker(api_key, secret_key)
    ticker.app.run_server(debug=True, host='127.0.0.1', port=16552)
if __name__ == '__main__':
    #app.run_server(debug=False, host='127.0.0.1', port=10010)
    Main()

我不确定我哪里出错了,但似乎没有调用回调函数。对不起,如果我错过了一些明显的东西!

我在 3.7.6 和 macos Big Sur 上运行

如果有人有任何建议,将不胜感激。

干杯

标签: python-3.xplotly-dashbinance

解决方案


对于任何有兴趣或试图对我做类似事情的人。

我设法通过在init中调用 app.callback并将回调分配给类中的函数来解决问题:

from binance.client import Client
from binance.websockets import BinanceSocketManager
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Output, Input
import plotly
import plotly.graph_objs as go
from collections import deque
import pandas as pd
import json

class priceTicker:
    def __init__(self, api_key, secret_key):
        self.ticker_time = deque(maxlen=200)
        self.last_price = deque(maxlen=200)
        client = Client(api_key, secret_key)
        socket = BinanceSocketManager(client)
        socket.start_kline_socket('BTCUSDT', self.on_message, '1m')

        socket.start()
        self.app = dash.Dash()
        self.app.layout = html.Div(
            [
                dcc.Graph(id = 'live-graph', animate = True),
                dcc.Interval(
                    id = 'graph-update',
                    interval = 2000,
                    n_intervals = 0
                )
            ]
        )
        self.app.callback(
            Output('live-graph', 'figure'),
            [Input('graph-update', 'n_intervals')])(self.update_graph)
        #app.run_server(debug=True, host='127.0.0.1', port=16452)
    def on_message(self, message):

        #price = {'time':message['E'], 'price':message['k']['c']}
        self.ticker_time.append(message['E'])
        self.last_price.append(message['k']['c'])
        #print(self.ticker_time, self.last_price)

    def update_graph(self, n):

        data = go.Scatter(
            x = list(self.ticker_time),
            y = list(self.last_price),
            name ='Scatter',
            mode = 'lines+markers'
        )


        return {'data': [data],
                'layout': go.Layout(xaxis=dict(range=[min(self.ticker_time), max(self.ticker_time)]),
                yaxis=dict(range=[min(self.last_price), max(self.last_price)]),)}

def Main():
    api_key = ''
    secret_key = ''
    ticker = priceTicker(api_key, secret_key)
    ticker.app.run_server(debug=True, host='127.0.0.1', port=16452)
if __name__ == '__main__':
    #app.run_server(debug=False, host='127.0.0.1', port=10010)
    Main()

应用程序每 2 秒更新一次最新价格,输出与预期一致。

干杯


推荐阅读