python - 如何独立运行 websockets
问题描述
我尝试启动 Binance websocket 来收集蜡烛数据。如果数据处理功能没有延迟,它工作得很好。但是当处理一个股票数据的函数发生一些暂停时,它也会延迟其他股票的响应。有人知道如何独立运行它们吗?
from binance.client import Client
from binance.websockets import BinanceSocketManager
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)
def process(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
time.sleep(5)
def socket_1():
conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')
def socket_2():
conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')
socket_1()
socket_2()
bm.start()
我尝试asyncio
按照@Mike Malyi 的建议让套接字运行两个单独的任务,但它并没有消除延迟:
import asyncio
def process(msg):
asyncio.run(main(msg))
async def main(msg):
if msg['s'] == 'ETHUSDT':
task1 = asyncio.create_task(process_message(msg))
await task1
else:
task2 = asyncio.create_task(process_message(msg))
await task2
async def process_message(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
await asyncio.sleep(5)
eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')
bm.start()
我还尝试使用Queue
in使该函数独立运行threads
,但没有帮助,一个函数仍然延迟另一个函数:
from queue import Queue
def consumer(in_q):
while True:
msg = in_q.get()
process_message(msg)
def producer(out_q):
eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')
def process_message(msg):
if msg['s'] == 'ETHUSDT':
time.sleep(5)
print(f"{msg['s']} with delay, {time.strftime('%X')}")
else:
print(f"{msg['s']} {time.strftime('%X')}")
q = Queue()
t1 = Thread(target = consumer, args =(q, ))
t2 = Thread(target = producer, args =(q, ))
t1.start()
t2.start()
bm.start()
解决方案
from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
def process_message(msg):
if msg['s'] == 'ETHUSDT':
print(f"{msg['s']} with delay, {time.strftime('%X')}")
time.sleep(5)
print('delay end')
else:
print(f"{msg['s']} {time.strftime('%X')}")
def build_thread (symbol):
print('start thread', symbol)
q = queue.Queue()
bm = BinanceSocketManager(client, user_timeout=60)
conn_key = bm.start_kline_socket(symbol, q.put, '1h')
bm.start()
while(True):
msg = q.get()
process_message(msg)
thread.start_new_thread(build_thread, ('ETHUSDT', ))
thread.start_new_thread(build_thread, ('BNBUSDT', ))
推荐阅读
- python - 使用 Python 将 ADODB 记录集写入数据透视缓存
- javascript - Lerna 避免符号链接
- ssl - Tomcat - SSL 证书配置
- rest - Acumatica Web 服务 - 从通用查询中获取数据
- php - laravel reactjs 将所有 api 请求放在一个 get 请求中
- rhel - CentOS 8 中的屏幕共享
- ios - 快速将viewcontroller拖到tabbarcontroller时黑屏可见?
- java - Mockito - NullPointerException
- javascript - 推入具有数组数组的数组数组
- javascript - 如何从 html 页面启动 js 文件?