首页 > 解决方案 > 如何在类中使用 websocket 获取数据并实例化变量

问题描述

我试图让一个 websocket 连接在一个类中工作,因为我需要访问self.close类之外的变量。问题是连接没有建立,我也无法访问里面的变量on_message

我怎样才能让它工作?

import websocket
import json


class WS:

    def __init__(self):

        self.socket = 'wss://ftx.com/ws/'

    def stream(self):
        self.ws = websocket.WebSocketApp(self.socket,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close)
        self.ws.run_forever()

    def on_open(self):

        print('connected')

        data = {'op': 'subscribe', 'channel': 'ticker', 'market': 'ETH-PERP'}
        self.ws.send(json.dumps(data))

    def on_close(ws):
        print('disconnected')

    def on_message(self,message):

        json_msg = json.loads(message)
        self.close = json_msg['data']['last']

    def get_data_out(self):

        return self.close
   
    def on_error(ws,error):
        print(error)

标签: pythonwebsocket

解决方案


我试图运行你的代码

WS().stream()

但这对我不起作用(即使我添加了 missing on_open=self.on_open

我发现甚至 ftx文档及其示例也使用模块websocket,但我没有测试它。

但是,如果我使用不同的模块websockets(带 char s)而不是websocket(不带 char s),那么我可以订阅频道并获取值。


最小的工作示例

import asyncio
import websockets
import json

async def handler():
    async with websockets.connect('wss://ftx.com/ws/') as ws:
        
        # subscribe
        data = {'op': 'subscribe', 'channel': 'ticker', 'market': 'ETH-PERP'}
        await ws.send(json.dumps(data))

        # get all messages (not only with `update`)
        async for message in ws:
            print(message)

# --- main ---
     
asyncio.get_event_loop().run_until_complete(handler())

要仅获last取值,它需要过滤消息:

        async for message in ws:
            data = json.loads(message)
            if data['type'] == 'update':
                print(data['data']['last'])

但是如果你想将它用作类,它仍然需要编写更多的代码。


顺便提一句:

似乎websocket(不带 char s)是非常旧的模块,最后一次更新是在 2010 年,websockets(带 char s)是在 2021 年(几周/几个月前)更新的。

https://pypi.org/project/websocket/

https://pypi.org/project/websockets/


编辑:

在其他问题中,我找到了显示有关连接的更多信息的命令:

websocket.enableTrace(True)

这会显示错误,因为函数执行时使用的值比您定义的更多。
他们必须得到websocket作为第一个参数(之后self

你有:

def on_open   (self):
def on_close  (ws):
def on_message(self, message):
def on_error  (ws, error):

但它必须是

def on_open   (self, ws):
def on_close  (self, ws):
def on_message(self, ws, message):
def on_error  (self, ws, error):

你也忘了on_open=self.on_open


完整的工作代码

import websocket
import json

class WS:

    def __init__(self):
        self.socket = 'wss://ftx.com/ws/'
        self.close = None # default value at start

    def stream(self):
        self.ws = websocket.WebSocketApp(
                    self.socket,
                    on_message=self.on_message,
                    on_error=self.on_error,
                    on_close=self.on_close,
                    on_open=self.on_open
                  )

        print('run forever')
        self.ws.run_forever()
        
    def on_open(self, ws):
        print('on_open:', ws)

        data = {'op': 'subscribe', 'channel': 'ticker', 'market': 'ETH-PERP'}
        
        self.ws.send(json.dumps(data))

    def on_close(self, ws):
        print('on_close:', ws)

    def on_message(self, ws, message):
        print('on_message:', ws, message)
        
        data = json.loads(message)

        if data['type'] == 'update':
            self.close = data['data']['last']

    def get_data_out(self):
        return self.close
   
    def on_error(self, ws, error):
        print('on_error:', ws, error)
        
# --- main ---

websocket.enableTrace(True)  # if you want to see more information  
                             # but code works also without this line

WS().stream()

推荐阅读