python - Python:如何将 API 请求中的文本/CSV 内容异步发送到 Google Cloud Storage?
问题描述
我希望从 API 下载文本/CSV 内容(每个 API 调用大约需要 5-6 分钟才能完成下载),然后将内容作为 csv 文件发送到云存储。我正在使用 Python 的 Async 库来执行这些 API 请求并将 CSV 文件异步加载到云存储中。
这是我的第一次尝试,但我不确定我是否做得正确。欢迎任何见解/指导?
蟒蛇代码:
async def upload_to_bucket_async(bucket_name, folder, dataset_id, timestamp, blob_name, file_obj):
"""Upload CSV Files to Cloud Storage Buckets"""
async with aiohttp.ClientSession() as session:
storage = Storage(session=session)
status = await storage.upload(bucket_name, f'{folder}/{dataset_id}/{timestamp}/{blob_name}', file_obj)
return status
async def get_csv_async(
session: aiohttp.ClientSession,
url,
version,
dataset_id,
timestamp,
bucket_name,
token,
**kwargs
):
async with session.get(
url % (dataset_id, timestamp),
headers={
'Authorization': 'token ' + token,
'X-API-Version': version,
'Accept': 'text/csv',
'Accept-Encoding': 'gzip'
},
**kwargs) as response:
res = await upload_to_bucket_async(bucket_name=bucket_name,
folder="thinknum",
dataset_id=dataset_id,
timestamp=timestamp,
blob_name="data.csv",
file_obj=await response.read()
)
return url % (dataset_id, timestamp), res
async def get_all_csvs_async(url, version, dataset_id, timestamps, bucket_name, token, **kwargs):
timeout = ClientTimeout(total=900)
async with aiohttp.ClientSession(timeout=timeout) as session:
futures = [get_csv_async(session=session, url=url, version=version, dataset_id=dataset_id, timestamp=t,
bucket_name=bucket_name, token=token,
**kwargs) for t in timestamps]
return await asyncio.gather(*futures, return_exceptions=True)
def run_calls(self, token, dataset_id, timestamps, batch_size):
ts_in_batches = list(chunks(timestamps, batch_size))
self.log(msg='Running API Calls to ThinkNum for Retrieving History (%s batches)' % len(ts_in_batches),
level=logging.INFO)
all_history = []
for ts in ts_in_batches:
res = asyncio.run(get_all_csvs_async(self.csv_data_endpoint, self.version, dataset_id, ts, self.bucket_name, token))
all_history.append(res)
self.log(msg='Successfully Completed API Calls to ThinkNum for All Batches', level=logging.INFO)
return all_history
解决方案
推荐阅读
- java - .CSV 以逗号分隔
- python - 安装 scikit-learn Docker 镜像问题
- r - 永久设置 R 3.5.1 工作目录
- c# - c# 在类中搜索随机属性值
- c++ - 未定义对符号“_ZN3ros10NodeHandle9subscribeERNS_16SubscribeOptionsE”的引用
- reactjs - 道具改变时map()导致的意外渲染
- laravel-5 - 如何在我的 Laravel Nova 字段中保存其他数据?
- php - 如何通过使用 excel 、 csv 在图像上写入值
- java - Java 反转 Switch 语句
- reactjs - Heroku 中的部署 React/Node 应用程序失败