首页 > 解决方案 > 使用 Python 多线程时,python 谷歌云存储库的超时问题

问题描述

我要解决的问题是解压缩 GCS 存储桶中的一堆 zip 文件。作为处理这些文件的一部分,我使用 API 端点在 MongoDB 中相应地更新文件状态。我有一个将字典作为参数并执行以下操作的方法。

  1. 将文件的状态更新为 PROCESSING
  2. 从 GCS 存储桶下载文件。
  3. 解压缩文件。
  4. 将解压后的文件上传回 GCS 存储桶。
  5. 将文件状态更新为 FINISHED

由于我在存储桶中有多个文件要解压缩,因此尝试使用 python 多线程并行化操作以在单独的线程中处理每个 zip 文件,但大约一分钟后requests.exceptions.Timeout,谷歌库抛出了错误。

当我尝试在不使用多线程的情况下处理文件时,它可以正常工作而没有任何错误。以下是供参考的代码和堆栈跟踪。任何帮助深表感谢。

def process_files_multithreading(self, file_info):
    print("process id: " +str(os.getpid()))
    try:
        print("Updating the file status to PROCESSING")
        update_file_status_url = utils.getUpdateFileStatusUrl(config["host"],
                                                                        file_info["fileId"])
        utils.updateFileStatus(update_file_status_url,"PROCESSING")
        client = gcs_utils._getGCSConnection()
        blob_path = file_info["fileLocation"]
        bucket = client.get_bucket(file_info["bucketName"])
        blob = bucket.blob(blob_path)
        zipbytes = io.BytesIO(blob.download_as_string())

        if is_zipfile(zipbytes):
            with ZipFile(zipbytes, 'r') as myzip:
                print(myzip.namelist())
                for contentfilename in myzip.namelist():
                    contentfile = myzip.read(contentfilename)
                    blob = bucket.blob(blob_path + "/" + contentfilename)
                    blob.upload_from_string(contentfile)
            print("proccessed file {} successfully".format(blob_path))
            print("updating the file status to FINISHED")
            update_file_status_url = utils.getUpdateFileStatusUrl(config["host"], file_info["fileId"])
            utils.updateFileStatus(update_file_status_url,"FINISHED")
    except(Exception):
        print("Exception in processing the file")
        print("updating the file status to FAILED")
        update_file_status_url = utils.getUpdateFileStatusUrl(config["host"], file_info["fileId"])
        utils.updateFileStatus(update_file_status_url,"FAILED")
        print(traceback.format_exc())

'''
calling the above function using ThreadPoolExecutor
'''

# files is the list of dictionaries

with ThreadPoolExecutor(len(files)) as pool:
    pool.map(self.process_files_multithreading, files)

例外:回溯(最近一次通话最后一次):

    blob.upload_from_string(contentfile)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1361, in upload_from_string
    predefined_acl=predefined_acl,
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1261, in upload_from_file
    client, file_obj, content_type, size, num_retries, predefined_acl
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1171, in _do_upload
    client, stream, content_type, size, num_retries, predefined_acl
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1118, in _do_resumable_upload
    response = upload.transmit_next_chunk(transport)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/google/resumable_media/requests/upload.py", line 425, in transmit_next_chunk
    retry_strategy=self._retry_strategy,
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/google/resumable_media/requests/_helpers.py", line 136, in http_request
    return _helpers.wait_and_retry(func, RequestsMixin._get_status_code, retry_strategy)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/google/resumable_media/_helpers.py", line 150, in wait_and_retry
    response = func()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/google/auth/transport/requests.py", line 287, in request
    **kwargs
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/google/auth/transport/requests.py", line 110, in __exit__
    raise self._timeout_error_type()
requests.exceptions.Timeout

标签: pythongoogle-cloud-storagepython-multithreading

解决方案


推荐阅读