python - Python:测量等待 ws.recv() 的 websocket 队列长度
问题描述
我正在消耗低延迟的市场数据,并且我正在尝试测量我可以消耗多少流,而不会由于 websocket 消息队列的建立而导致我的代码变慢。我的理解是,消息由 websocket 接收并排队,直到 ws.recv() 被调用,它按接收顺序一次处理一个。在正常情况下,我的代码绝对足够快来处理消息,但是当消息突然出现时,我会想象队列已满。我希望队列只会被填满 5 或 10 毫秒,但我知道这一点非常重要。有没有办法测量队列中有多少消息正在等待?
我附上了我用于上下文的代码片段,但相关部分只是循环
data = self.ws.recv()
class WebsocketClient(object):
def __init__(
self,
url=""
products=None,
message_type="subscribe",
should_print=True,
self.url = url
self.products = products
self.channels = channels
self.type = message_type
self.stop = True
self.error = None
self.ws = None
self.thread = None
self.auth = auth
self.api_key = api_key
self.api_secret = api_secret
self.api_passphrase = api_passphrase
self.should_print = should_print
def start(self):
def _go():
self._connect()
self._listen()
self._disconnect()
self.stop = False
self.on_open()
self.thread = Thread(target=_go)
self.keepalive = Thread(target=self._keepalive)
self.thread.start()
def _connect(self):
if self.products is None:
self.products = []
elif not isinstance(self.products, list):
self.products = [self.products]
if self.url[-1] == "/":
self.url = self.url[:-1]
if self.channels is None:
self.channels = [{"name": "ticker", "product_ids": [product_id for product_id in self.products]}]
sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}
else:
sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}
if self.auth:
#timestamp = int(time.time())
#message = timestamp + 'GET' + '/users/self/verify'
auth_headers = get_auth_headers('/users/self/verify','GET','')
#print(auth_headers)
sub_params['signature'] = auth_headers['CB-ACCESS-SIGN']
sub_params['key'] = auth_headers['CB-ACCESS-KEY']
sub_params['passphrase'] = auth_headers['CB-ACCESS-PASSPHRASE']
sub_params['timestamp'] = auth_headers['CB-ACCESS-TIMESTAMP']
try:
self.ws = create_connection(self.url)
self.ws.send(json.dumps(sub_params))
except:
traceback.print_exc()
self.stop = True
def _keepalive(self, interval=10):
while self.ws.connected:
self.ws.ping("keepalive")
time.sleep(interval)
def _listen(self):
self.keepalive.start()
while not self.stop:
try:
data = self.ws.recv()
msg = json.loads(data)
except ValueError as e:
self.on_error(e)
except Exception as e:
self.on_error(e)
else:
self.on_message(msg)
def _disconnect(self):
try:
if self.ws:
self.ws.close()
except WebSocketConnectionClosedException as e:
pass
finally:
self.keepalive.join()
self.on_close()
def close(self):
self.stop = True # will only disconnect after next msg recv
self._disconnect() # force disconnect so threads can join
self.thread.join()
def on_open(self):
if self.should_print:
print("-- Subscribed! --\n")
def on_close(self):
if self.should_print:
print("\n-- Socket Closed --")
def on_message(self, msg):
*** my logic ***
def on_error(self, e, data=None):
self.error = e
self.stop = True
print('{} - data: {}'.format(e, data))
解决方案
推荐阅读
- xcode - 转换 Xcode 视图层次结构 RGB 颜色
- r - 使用 dplyr 一周中所有日子的平均骑手人数
- javascript - 在 jpeg 上添加半透明的纯色 bg
- javascript - 使用套接字或 REST 在特定时间间隔内获取多个硬币的平均交易量
- mysql - MySql 表关系
- html - 如何修复 pre 和 code 标签上的缩进?
- python - Django Rest Framework:一个项目,多个应用程序,每个应用程序一台服务器?
- php - 通过 PHP 上传文件时完全忽略 .htaccess 参数
- reactjs - Flask-CORS 后端(在 Heroku 上)返回 HTML 文档而不是 JSON
- excel - VBA 帮助 - 如果一列包含特定字母,则将另一列设为负数,然后循环