首页 > 解决方案 > Python:如何将 API 请求中的文本/CSV 内容异步发送到 Google Cloud Storage?

问题描述

我希望从 API 下载文本/CSV 内容(每个 API 调用大约需要 5-6 分钟才能完成下载),然后将内容作为 csv 文件发送到云存储。我正在使用 Python 的 Async 库来执行这些 API 请求并将 CSV 文件异步加载到云存储中。

这是我的第一次尝试,但我不确定我是否做得正确。欢迎任何见解/指导?

蟒蛇代码:

async def upload_to_bucket_async(bucket_name, folder, dataset_id, timestamp, blob_name, file_obj):
    """Upload CSV Files to Cloud Storage Buckets"""

    async with aiohttp.ClientSession() as session:
        storage = Storage(session=session)
        status = await storage.upload(bucket_name, f'{folder}/{dataset_id}/{timestamp}/{blob_name}', file_obj)
        return status

async def get_csv_async(
              session: aiohttp.ClientSession,
              url,
              version,
              dataset_id,
              timestamp,
              bucket_name,
              token,
              **kwargs
              ):

    async with session.get(
            url % (dataset_id, timestamp),
            headers={
            'Authorization': 'token ' + token,
            'X-API-Version': version,
            'Accept': 'text/csv',
            'Accept-Encoding': 'gzip'
        },
            **kwargs) as response:

        res = await upload_to_bucket_async(bucket_name=bucket_name,
                                     folder="thinknum",
                                     dataset_id=dataset_id,
                                     timestamp=timestamp,
                                     blob_name="data.csv",
                                     file_obj=await response.read()
                                     )

        return url % (dataset_id, timestamp), res

async def get_all_csvs_async(url, version, dataset_id, timestamps, bucket_name, token, **kwargs):
    timeout = ClientTimeout(total=900)
    async with aiohttp.ClientSession(timeout=timeout) as session:

        futures = [get_csv_async(session=session, url=url, version=version, dataset_id=dataset_id, timestamp=t,
                                 bucket_name=bucket_name, token=token,
                                 **kwargs) for t in timestamps]

        return await asyncio.gather(*futures, return_exceptions=True)


def run_calls(self, token, dataset_id, timestamps, batch_size):
    ts_in_batches = list(chunks(timestamps, batch_size))
    self.log(msg='Running API Calls to ThinkNum for Retrieving History (%s batches)' % len(ts_in_batches),
             level=logging.INFO)

    all_history = []
    for ts in ts_in_batches:
        res = asyncio.run(get_all_csvs_async(self.csv_data_endpoint, self.version, dataset_id, ts, self.bucket_name, token))
        all_history.append(res)

    self.log(msg='Successfully Completed API Calls to ThinkNum for All Batches', level=logging.INFO)

    return all_history

标签: pythonasynchronouspython-requestsgoogle-cloud-storagepython-asyncio

解决方案


推荐阅读