python-asyncio - ib_insync reqHistoricalDataAsync 如何与 Asyncio 一起工作?
问题描述
import asyncio
import ib_insync as ibi
import symbol_list
import time
start = time.perf_counter()
stocklist = symbol_list.test
endDateTime = '20190328 09:30:00'
durationStr='1 D'
dataDirectory = './data/tmp'
class App:
async def run(self):
self.ib = ibi.IB()
with await self.ib.connectAsync():
contracts = [
ibi.Stock(symbol, 'SMART', 'USD')
for symbol in ['AAPL', 'TSLA', 'AMD', 'INTC']]
for contract in contracts:
# self.ib.reqMktData(contract)
bars = await self.ib.reqHistoricalDataAsync(contract,
endDateTime=endDateTime, durationStr=durationStr,
barSizeSetting='5 mins', whatToShow='MIDPOINT', useRTH=True)
df = ibi.util.df(bars)
df.to_csv(f"{dataDirectory}/{contract.symbol}.csv")
async for tickers in self.ib.pendingTickersEvent:
for ticker in tickers:
print(ticker)
def stop(self):
self.ib.disconnect()
app = App()
try:
asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
app.stop()
endtime = (time.perf_counter() - start)/60
print(f"Process time: {endtime:,.2f} minutes")
请帮忙。我从async-streaming-example修改了示例代码。我没有收到任何错误消息,但它只是在没有给我 shell 提示的情况下运行。如果运行正常,这段代码应该只需要不到一分钟的时间。本质上,我想使用 reqHistoricalDataAsync 异步获取历史数据,而不是 reqMktData。我还查看了使用 ib_insync 的异步执行,但我也无法使用该技术。你能告诉我我做错了什么吗?我欢迎任何异步解决方案。谢谢你。
解决方案
推荐阅读
- java - 根据Java中的TimeZone转换时间戳
- javascript - 如何防止不需要的文件轮换?
- python - 如何更改 virtualenv 使用的默认 python 版本?
- python - PYTHON:如果字符串存在,则从 CSV 获取特定的单元格值
- json - 问题:有没有办法让 Lottie(json 文件)从一帧滚动到另一帧?
- c# - 如何根据 Asp.net Gridview 中的文本更改单元格的颜色
- xmlbeans-maven-plugin - 使用 XMLBeans 5.0.1 版和 xmlbeans-maven-plugin 5.0.1 生成模式类失败
- php - Laravel 向网页添加广告
- python - 需要拆分并检查列表元素是否在 pandas df 中可用
- python - 从视频帧文件路径数据集中获取补丁 ID