python - 无法使用 Python 类结构/将函数转换为类结构获取队列值
问题描述
我编写了一个面向对象的脚本Python
来检索和处理实时 Binance 分时数据。该类将刻度数据存储在Queue
. 为了进一步处理,Queue
数据被加载get_queue() method
到同一个类中。这是我的代码:
import websocket, json
from datetime import datetime
import threading
import time
import queue
class stream:
def __init__(self, event_queue):
self.event_queue = event_queue
def on_message(self, ws, message):
data = json.loads(message)
timestamp = datetime.utcfromtimestamp(data['E']/1000).strftime('%Y-%m-%d %H:%M:%S')
symbol = data['s']
open = data['o']
high = data['h']
low = data['l']
close = data['c']
volume = data['v']
trade = data['n'] #No. of Trades
tick = f'tick :timestamp: {timestamp} :symbol: {symbol} :close_price: {close} :volume: {volume}:open_price: {open}:high_price: {high}:low_price: {low}:trade_qyt: {trade}'
self.event_queue.put(tick)
def on_close(self, ws, message):
print("bang")
def run(self):
self.socket = websocket.WebSocketApp(
"wss://stream.binance.com:9443/ws/btcusdt@ticker/ethbtc@ticker/bnbbtc@ticker/wavesbtc@ticker/stratbtc@ticker/ethup@ticker/yfiup@ticker/xrpup@ticker",
on_message=self.on_message,
on_close=self.on_close)
self.wst = threading.Thread(target=lambda: self.socket.run_forever())
self.wst.daemon = True
self.wst.start()
while not self.socket.sock.connected: #and conn_timeout:
print("this")
time.sleep(1)
while self.socket.sock is not None:
print("that")
time.sleep(10)
def get_queue(self):
while 1:
print("This is this: ", self.event_queue.get(True))
if __name__ == "__main__":
message_queue = queue.Queue()
stream = stream(event_queue=message_queue)
thread = threading.Thread(target=stream.get_queue,daemon=True)
thread.start()
stream.run()
使用面向对象的方法(如上所示),我的 Queue 最终没有为我的终端提供任何值。然而,通过程序方法(下面给出)我得到了我想要的结果。下面是程序实现:
import websocket, json
from datetime import datetime
import threading
import time
import queue
events = queue.Queue()
socket = f'wss://stream.binance.com:9443/ws/btcusdt@ticker/ethbtc@ticker/bnbbtc@ticker/wavesbtc@ticker/stratbtc@ticker/ethup@ticker/yfiup@ticker/xrpup@ticker'
def on_message(ws, message):
data = json.loads(message)
timestamp = datetime.utcfromtimestamp(data['E']/1000).strftime('%Y-%m-%d %H:%M:%S')
symbol = data['s']
open = data['o']
high = data['h']
low = data['l']
close = data['c']
volume = data['v']
trade = data['n'] #No. of Trades
tick = f'tick :timestamp: {timestamp} :symbol: {symbol} :close_price: {close} :volume: {volume}:open_price: {open}:high_price: {high}:low_price: {low}:trade_qyt: {trade}'
events.put(tick)
# print("This is this: ", events.get(True))
def on_close(ws, message):
print("bang")
def get_queue():
while 1:
print(events.get(True))
def run():
websocket.enableTrace(False)
ws = websocket.WebSocketApp(
socket, on_message=on_message, on_close=on_close)
wst = threading.Thread(target=ws.run_forever)
wst.daemon = True
wst.start()
while not ws.sock.connected: #and conn_timeout:
print("this")
time.sleep(1)
while ws.sock is not None:
print("that")
time.sleep(10)
def main():
get_queue_th = threading.Thread(target=get_queue)
get_queue_th.daemon = True
get_queue_th.start()
run()
if __name__ == "__main__":
main()
我想采用面向对象的方法,但我无法发现错误。任何帮助将不胜感激。
谢谢!
解决方案
我只是将类名大写,其余部分保持原样。它似乎对我有用。
代码:
import websocket
import json
import threading
import time
import queue
from datetime import datetime
class Stream:
def __init__(self, event_queue):
self.event_queue = event_queue
def on_message(self, ws, message):
data = json.loads(message)
timestamp = datetime.utcfromtimestamp(data['E']/1000).strftime('%Y-%m-%d %H:%M:%S')
symbol = data['s']
open = data['o']
high = data['h']
low = data['l']
close = data['c']
volume = data['v']
trade = data['n'] #No. of Trades
tick = f'tick :timestamp: {timestamp} :symbol: {symbol} :close_price: {close} :volume: {volume}:open_price: {open}:high_price: {high}:low_price: {low}:trade_qyt: {trade}'
self.event_queue.put(tick)
def on_close(self, ws, message):
print("bang")
def run(self):
self.socket = websocket.WebSocketApp(
"wss://stream.binance.com:9443/ws/btcusdt@ticker/ethbtc@ticker/bnbbtc@ticker/wavesbtc@ticker/stratbtc@ticker/ethup@ticker/yfiup@ticker/xrpup@ticker",
on_message=self.on_message,
on_close=self.on_close)
self.wst = threading.Thread(target=lambda: self.socket.run_forever())
self.wst.daemon = True
self.wst.start()
while not self.socket.sock.connected: #and conn_timeout:
print("this")
time.sleep(1)
while self.socket.sock is not None:
print("that")
time.sleep(10)
def get_queue(self):
while 1:
print("This is this: ", self.event_queue.get(True))
if __name__ == "__main__":
message_queue = queue.Queue()
stream = Stream(event_queue=message_queue)
thread = threading.Thread(target=stream.get_queue,daemon=True)
thread.start()
stream.run()
终端输出:
this
This is this: tick :timestamp: 2021-08-18 04:33:09 :symbol: BNBBTC :close_price: 0.00885000 :volume: 218003.86000000:open_price: 0.00922500:high_price: 0.00927300:low_price: 0.00870100:trade_qyt: 134006
that
This is this: tick :timestamp: 2021-08-18 04:33:09 :symbol: ETHBTC :close_price: 0.06768900 :volume: 149723.83900000:open_price: 0.06892500:high_price: 0.06994900:low_price: 0.06655500:trade_qyt: 252554
This is this: tick :timestamp: 2021-08-18 04:33:09 :symbol: WAVESBTC :close_price: 0.00049340 :volume: 201920.09000000:open_price: 0.00052000:high_price: 0.00052770:low_price: 0.00047690:trade_qyt: 16067
This is this: tick :timestamp: 2021-08-18 04:33:09 :symbol: BTCUSDT :close_price: 45032.73000000 :volume: 61188.11215700:open_price: 46346.01000000:high_price: 47160.00000000:low_price: 44203.28000000:trade_qyt: 1831000
This is this: tick :timestamp: 2021-08-18 04:33:10 :symbol: BNBBTC :close_price: 0.00885000 :volume: 218003.86000000:open_price: 0.00922500:high_price: 0.00927300:low_price: 0.00870100:trade_qyt: 134006
This is this: tick :timestamp: 2021-08-18 04:33:10 :symbol: ETHBTC :close_price: 0.06768900 :volume: 149723.83900000:open_price: 0.06892500:high_price: 0.06994900:low_price: 0.06655500:trade_qyt: 252554
This is this: tick :timestamp: 2021-08-18 04:33:10 :symbol: WAVESBTC :close_price: 0.00049340 :volume: 201920.40000000:open_price: 0.00052000:high_price: 0.00052770:low_price: 0.00047690:trade_qyt: 16068
This is this: tick :timestamp: 2021-08-18 04:33:10 :symbol: BTCUSDT :close_price: 45035.31000000 :volume: 61188.04606300:open_price: 46346.01000000:high_price: 47160.00000000:low_price: 44203.28000000:trade_qyt: 1830996
This is this: tick :timestamp: 2021-08-18 04:33:11 :symbol: BNBBTC :close_price: 0.00885000 :volume: 218003.86000000:open_price: 0.00922500:high_price: 0.00927300:low_price: 0.00870100:trade_qyt: 134006
This is this: tick :timestamp: 2021-08-18 04:33:11 :symbol: ETHBTC :close_price: 0.06768900 :volume: 149723.80700000:open_price: 0.06891900:high_price: 0.06994900:low_price: 0.06655500:trade_qyt: 252553
This is this: tick :timestamp: 2021-08-18 04:33:11 :symbol: WAVESBTC :close_price: 0.00049340 :volume: 201920.64000000:open_price: 0.00052000:high_price: 0.00052770:low_price: 0.00047690:trade_qyt: 16069
This is this: tick :timestamp: 2021-08-18 04:33:11 :symbol: BTCUSDT :close_price: 45040.84000000 :volume: 61187.84066900:open_price: 46346.01000000:high_price: 47160.00000000:low_price: 44203.28000000:trade_qyt: 1830984
This is this: tick :timestamp: 2021-08-18 04:33:12 :symbol: BNBBTC :close_price: 0.00885300 :volume: 218005.44000000:open_price: 0.00922500:high_price: 0.00927300:low_price: 0.00870100:trade_qyt: 134007
This is this: tick :timestamp: 2021-08-18 04:33:12 :symbol: ETHBTC :close_price: 0.06768000 :volume: 149723.81200000:open_price: 0.06891900:high_price: 0.06994900:low_price: 0.06655500:trade_qyt: 252554
This is this: tick :timestamp: 2021-08-18 04:33:12 :symbol: BTCUSDT :close_price: 45040.83000000 :volume: 61187.58821700:open_price: 46346.00000000:high_price: 47160.00000000:low_price: 44203.28000000:trade_qyt: 1830959
This is this: tick :timestamp: 2021-08-18 04:33:13 :symbol: BNBBTC :close_price: 0.00885300 :volume: 218000.84000000:open_price: 0.00922500:high_price: 0.00927300:low_price: 0.00870100:trade_qyt: 134005
This is this: tick :timestamp: 2021-08-18 04:33:13 :symbol: ETHBTC :close_price: 0.06767700 :volume: 149724.88400000:open_price: 0.06891900:high_price: 0.06994900:low_price: 0.06655500:trade_qyt: 252553
This is this: tick :timestamp: 2021-08-18 04:33:13 :symbol: WAVESBTC :close_price: 0.00049340 :volume: 201920.64000000:open_price: 0.00052000:high_price: 0.00052770:low_price: 0.00047690:trade_qyt: 16069
您必须安装 2 个 python 包才能运行此脚本。
pip install websocket
pip install websocket-client
如果您有现有的,请确保您有最新的
推荐阅读
- java - 找到 findAny 匹配后如何停止并行流?
- javascript - Bootstrap 4标签:获取点击标签的索引号?
- python - 如何生成包含特定条目的 Kaggle 提交 CSV 文件?
- excel-formula - 忽略空白的嵌套 IF 条件
- spring - 春季云流错误通道不起作用
- corda - 尝试使用 RPC 连接 Corda 节点时出错
- mysql - MySQL:使用带有 IN 的函数输出
- c# - Asp.Net httpcontext.current.request 适用于运行 iis7 的虚拟 windows server 2012 但不适用于 windows server 2016 iis10
- objective-c - OCMock 通过键值观察给出错误
- audiokit - AudioKit 的 FFTTap 在重新启动时在物理设备上给出零