python - Pandas 到 CSV 和 asyncio
问题描述
我正在尝试从 Pandas DataFrame 编写一个 CSV,该数据帧不断附加来自交换的数据(CCXT 库,使用 Asyncio)。我面临的问题是我的 CSV 只写在 GetThicker 函数的末尾(在这种情况下,永远不会)。如何逐行编写此 CSV?
# -*- coding: utf-8 -*-
#!/usr/bin/env python
import asyncio
import os
import sys
import pandas as pd
root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(root + '/python')
import ccxt.async_support as ccxt # noqa: E402
def WriteCSV(df):
df.to_csv('Test2.csv',columns=['Date & Time', 'Closing Price'])
#print(df)
async def GetThicker(symbol):
exchange = ccxt.binance({
'enableRateLimit': True, # this option enables the built-in rate limiter
})
data = {'Date & Time' : 0,'Closing Price':0}
index = 0
df = pd.DataFrame(data, columns={'Date & Time','Closing Price'}, index=[index])
df.index.name = 'ID'
while True:
#print(exchange.iso8601(exchange.milliseconds()), 'fetching', symbol, 'ticker from', exchange.name)
try:
ticker = await exchange.fetch_ticker(symbol)
df.loc[index,'Closing Price'] = ticker['close']
df.loc[index,'Date & Time'] = ticker['datetime']
print(ticker['datetime'],'-',ticker['close'])
WriteCSV(df)
index +=1
except ccxt.RequestTimeout as e:
print('[' + type(e).__name__ + ']')
print(str(e)[0:200])
# will retry
except ccxt.DDoSProtection as e:
print('[' + type(e).__name__ + ']')
print(str(e.args)[0:200])
# will retry
except ccxt.ExchangeNotAvailable as e:
print('[' + type(e).__name__ + ']')
print(str(e.args)[0:200])
# will retry
except ccxt.ExchangeError as e:
print('[' + type(e).__name__ + ']')
print(str(e)[0:200])
break # won't retry
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(GetThicker('BTC/USDT'))
if __name__ == "__main__":
main()
#asyncio.get_event_loop().run_until_complete(GetThicker('BTC/USDT'))
有什么建议么?
谢谢
解决方案
如果您在正在运行的协程中创建交换实例,则还需要将 传递asyncio_loop
给它的构造函数。
仅当在异步上下文之外创建交换实例时,该asyncio_loop
参数才是可选的。
如果交换是在异步上下文中实例化的,就像您的情况一样,它必须知道要使用哪个异步循环。
请参阅下面的编辑代码和注释:
import asyncio
import os
import sys
import pandas as pd
root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(root + '/python')
import ccxt.async_support as ccxt # noqa: E402
def WriteCSV(df):
# print(df)
df.to_csv('Test2.csv',columns=['Date & Time', 'Closing Price'])
async def GetThicker(loop, symbol): # ←------ HERE
exchange = ccxt.binance({
'enableRateLimit': True, # this option enables the built-in rate limiter
'asyncio_loop': loop, # ←------ HERE
})
data = {'Date & Time' : 0,'Closing Price':0}
index = 0
df = pd.DataFrame(data, columns={'Date & Time','Closing Price'}, index=[index])
df.index.name = 'ID'
while True:
try:
ticker = await exchange.fetch_ticker(symbol)
df.loc[index,'Closing Price'] = ticker['close']
df.loc[index,'Date & Time'] = ticker['datetime']
print(ticker['datetime'],'-',ticker['close'])
WriteCSV(df)
index +=1
except ccxt.NetworkError as e:
print('[' + type(e).__name__ + ']')
print(str(e.args)[0:200])
# will retry
except ccxt.ExchangeError as e:
print('[' + type(e).__name__ + ']')
print(str(e)[0:200])
break # won't retry
def main():
loop = asyncio.get_event_loop()
symbol = 'BTC/USDT'
loop.run_until_complete(GetThicker(loop, symbol)) # ←------ HERE
if __name__ == "__main__":
main()
推荐阅读
- python - 为“cudf”做“diff”的最有效方法是什么
- python - node进程和python子进程之间的双向通信
- java - 我在哪里放置我的资源文件以使其在 Spring 中使用类路径可读?
- java - 本地开发不能禁用 Spring 配置服务器
- apache - htaccess 错误页面文件不适用于重写条件
- python - Python 2D 数组/列表分配给出了不需要的结果
- facebook - 为什么我无法在 Instagram API 中生成用户令牌?
- python - 在Django中单击like按钮时出错
- react-native - dispatch 不调用reducer React Native
- python - 在 pandas 中用较短的名称替换所有行值