首页 > 解决方案 > 如何独立运行 websockets

问题描述

我尝试启动 Binance websocket 来收集蜡烛数据。如果数据处理功能没有延迟,它工作得很好。但是当处理一个股票数据的函数发生一些暂停时,它也会延迟其他股票的响应。有人知道如何独立运行它们吗?

from binance.client import Client
from binance.websockets import BinanceSocketManager

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)

def process(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)

def socket_1():
     conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')

def socket_2():
     conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')

socket_1()
socket_2()

bm.start()

我尝试asyncio按照@Mike Malyi 的建议让套接字运行两个单独的任务,但它并没有消除延迟:

import asyncio

def process(msg):
    asyncio.run(main(msg))

async def main(msg):
    if msg['s'] == 'ETHUSDT':
        task1 = asyncio.create_task(process_message(msg))
        await task1
    else:
        task2 = asyncio.create_task(process_message(msg))
        await task2

async def process_message(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        await asyncio.sleep(5)

eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')

bm.start()

我还尝试使用Queuein使该函数独立运行threads,但没有帮助,一个函数仍然延迟另一个函数:

from queue import Queue

def consumer(in_q):
    while True:
        msg = in_q.get()
        process_message(msg)
    
def producer(out_q):
    eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
    bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)
        print(f"{msg['s']} with delay, {time.strftime('%X')}")
    else:
        print(f"{msg['s']} {time.strftime('%X')}")


q = Queue()
t1 = Thread(target = consumer, args =(q, )) 
t2 = Thread(target = producer, args =(q, )) 
t1.start() 
t2.start() 

bm.start() 

标签: pythonasynchronouswebsocketpython-asynciobinance

解决方案


from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
      print(f"{msg['s']} with delay, {time.strftime('%X')}")
      time.sleep(5)
      print('delay end')  
    else:
        print(f"{msg['s']} {time.strftime('%X')}")
  

def build_thread (symbol):
  print('start thread', symbol)
  q = queue.Queue()
  bm = BinanceSocketManager(client, user_timeout=60)
  conn_key = bm.start_kline_socket(symbol, q.put, '1h')
  bm.start()
  while(True):
    msg = q.get()
    process_message(msg)

thread.start_new_thread(build_thread, ('ETHUSDT', ))  
thread.start_new_thread(build_thread, ('BNBUSDT', ))  

推荐阅读