首页 > 解决方案 > Python:测量等待 ws.recv() 的 websocket 队列长度

问题描述

我正在消耗低延迟的市场数据,并且我正在尝试测量我可以消耗多少流,而不会由于 websocket 消息队列的建立而导致我的代码变慢。我的理解是,消息由 websocket 接收并排队,直到 ws.recv() 被调用,它按接收顺序一次处理一个。在正常情况下,我的代码绝对足够快来处理消息,但是当消息突然出现时,我会想象队列已满。我希望队列只会被填满 5 或 10 毫秒,但我知道这一点非常重要。有没有办法测量队列中有多少消息正在等待?

我附上了我用于上下文的代码片段,但相关部分只是循环 data = self.ws.recv()

class WebsocketClient(object):
    def __init__(
            self,
            url=""
            products=None,
            message_type="subscribe",
            should_print=True,

        self.url = url
        self.products = products
        self.channels = channels
        self.type = message_type
        self.stop = True
        self.error = None
        self.ws = None
        self.thread = None
        self.auth = auth
        self.api_key = api_key
        self.api_secret = api_secret
        self.api_passphrase = api_passphrase
        self.should_print = should_print
        

    def start(self):
        def _go():
            self._connect()
            self._listen()
            self._disconnect()

        self.stop = False
        self.on_open()
        self.thread = Thread(target=_go)
        self.keepalive = Thread(target=self._keepalive)
        self.thread.start()

    def _connect(self):
        if self.products is None:
            self.products = []
        elif not isinstance(self.products, list):
            self.products = [self.products]

        if self.url[-1] == "/":
            self.url = self.url[:-1]

        if self.channels is None:
            self.channels = [{"name": "ticker", "product_ids": [product_id for product_id in self.products]}]
            sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}
        else:
            sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}

        if self.auth:
            #timestamp = int(time.time())
            #message = timestamp + 'GET' + '/users/self/verify'
            auth_headers = get_auth_headers('/users/self/verify','GET','')
            #print(auth_headers)
            sub_params['signature'] = auth_headers['CB-ACCESS-SIGN']
            sub_params['key'] = auth_headers['CB-ACCESS-KEY']
            sub_params['passphrase'] = auth_headers['CB-ACCESS-PASSPHRASE']
            sub_params['timestamp'] = auth_headers['CB-ACCESS-TIMESTAMP']

        try:
            self.ws = create_connection(self.url)
            self.ws.send(json.dumps(sub_params))
            
        except:
            traceback.print_exc()
            self.stop = True

    def _keepalive(self, interval=10):
        while self.ws.connected:
            self.ws.ping("keepalive")
            time.sleep(interval)

    def _listen(self):
        self.keepalive.start()
        while not self.stop:
            try:
                data = self.ws.recv()
                msg = json.loads(data)
            except ValueError as e:
                self.on_error(e)
            except Exception as e:
                self.on_error(e)
            else:
                self.on_message(msg)

    def _disconnect(self):
        try:
            if self.ws:
                self.ws.close()
        except WebSocketConnectionClosedException as e:
            pass
        finally:
            self.keepalive.join()

        self.on_close()

    def close(self):
        self.stop = True   # will only disconnect after next msg recv
        self._disconnect() # force disconnect so threads can join
        self.thread.join()

    def on_open(self):
        if self.should_print:
            print("-- Subscribed! --\n")

    def on_close(self):
        if self.should_print:
            print("\n-- Socket Closed --")

    def on_message(self, msg):
        *** my logic ***
        
    def on_error(self, e, data=None):
        self.error = e
        self.stop = True
        print('{} - data: {}'.format(e, data))

标签: pythonwebsocket

解决方案


推荐阅读