首页 > 解决方案 > Cloud Function python的问题

问题描述

** 我有一个谷歌云功能,它需要连接到 url 并以 csv 文件的形式获取数据并存储在一个存储桶中。这是用python代码写的。

当我测试该功能时,它编译成功,但根本不工作。当我检查日志时,它给出了 eblwo 提到的错误。

  1. favt_LnT_acn_blackline_data_pull_func43jttmffma0g AccessSecretVersionRequest 的构造函数输入无效:'projects/gcp-favt-acn-rpt-dev/secrets/blackline_api_key/versions/latest'

请找到代码并提出建议。

谢谢,维塔尔**

'
import base64
import logging
import requests
#import pandas as pd
#from pandas import json_normalize
import json
import os
import datetime
from datetime import datetime as dt
import pytz
from google.cloud import storage
from google.cloud import secretmanager

def delete_and_upload_blob(landing_bucket_name, 
                source_file_name, 
                landing_blob_name, 
                retention_bucket_name, 
                file_retention_flag,
                retn_file_suffix,
                rpt_last_run_file):

    storage_client = storage.Client()
    bucket = storage_client.bucket(landing_bucket_name)
    blob = bucket.blob(landing_blob_name)
    rpt_last_run_blob = bucket.blob('some.csv')
    retention_bucket = storage_client.bucket(retention_bucket_name)

    if blob.exists(storage_client):
    #Delete the old file
        blob.delete()
        print('File {} is deleted from Cloud Storage before 
        Upload'.format(landing_blob_name))
    else:
        print('No Such File Exists in Storage Bucket to Delete. So, 
        proceeding with Upload')
    #Upload new one
    blob.upload_from_filename(source_file_name)
    print("File {} uploaded to Bucket {} With Name 
    {}.".format(source_file_name, bucket, landing_blob_name))
    if file_retention_flag == 'Y':
        #Copy the last file of the day to retention bucket
        new_file_name = retn_file_suffix + '_' + landing_blob_name
        blob_copy = bucket.copy_blob(blob, retention_bucket, 
        new_file_name)
        print('File {} is copied to Retention Bucket 
        {}'.format(new_file_name, retention_bucket))

       if rpt_last_run_blob.exists(storage_client):
       #Delete the old file
           rpt_last_run_blob.delete()
        print('File {} is deleted from Cloud Storage before 
        Upload'.format(rpt_last_run_blob))
       else:
         print('No Such File Exists in Storage Bucket to Delete. So, 
   proceeding with Upload')
    #Upload new one
    rpt_last_run_blob.upload_from_filename(rpt_last_run_file)
    print("File {} uploaded to Bucket {} With Name 
   {}.".format(rpt_last_run_file, bucket, 
  'Reports_Latest_Run_time.csv'))

def api_request():    

    et = pytz.timezone("US/Eastern")
    current_et_time = dt.now().astimezone(et)
    print('Current ET Time:', current_et_time)

    pt = pytz.timezone("US/Pacific")

    ut = pytz.timezone("UTC")

    blackline_base_url = "https://....com"
    blackline_sts_url = blackline_base_url + "/authorize/connect/token"

    project_id = 'gcp-favt-acn-dev'
    secret_id = '###_api_key'
    secret_client = secretmanager.SecretManagerServiceClient()
    secret_name = 
    secret_client.secret_version_path(project_id,secret_id,'latest')
    secret_resp = secret_client.access_secret_version(secret_name)
    api_key = secret_resp.payload.data.decode('UTF-8')

    grant_type = 'password'
    scope = '####'
    username = '####'


    payload = 'grant_type='+grant_type+'&scope='+scope+
    '&username='+username+'&password='+api_key

    sts_headers = { 'Authorization': 'Basic dXBzOk5KXXx2VENsSiEtRw==',
   'Content-Type': 'application/x-www-form-urlencoded',
   

   'Cookie':
  'BLSIAPPEN=!bpJj4AOTHPcaqipWtDI6FrozN629M9xYLA/
    sbM1DWVH+jjuY5fgHVMACha2rIapXRoB7CcqnlaHgBw=='}  
     response = requests.request("POST", ###_sts_url, headers = 
    sts_headers, data = payload)

    if response.ok:
    sts_response = response.json()
    access_token = sts_response['access_token']
    print(access_token)
    blackline_rpt_submit_url = ##_base_url + '/api/queryruns'
    rpt_payload = ''
    blackline_rpt_api_headers = 
     {'Authorization': 'Bearer {}'.format(access_token), 'Content-Type': 
      'text/plain'}
    rpt_resp = requests.request("GET", blackline_rpt_submit_url, headers 
     = blackline_rpt_api_headers, data = rpt_payload)
    print(rpt_resp.text)
    jl = json.loads(rpt_resp.text)
    reports_list = []

    rprts_filename = "tmp_rprts.csv"
    rprts_full_path = os.path.join("/tmp",rprts_filename)
    with open(rprts_full_path, 'w') as f:
        f.write('ReportName,ReportLastRunTime'+'\n')
    hrs = -2
    hrs_to_subtract = datetime.timedelta(hours=hrs)
    two_hrs_ago_time = current_et_time + hrs_to_subtract
    #print(two_hrs_ago_time)#latest_rpt_check_time)
    frmtd_curr_time = two_hrs_ago_time.strftime('%Y-%m-%d %H:%M:%S')
    latest_rpt_check_time = 
    dt.strptime(frmtd_curr_time,'%Y-%m-%d %H:%M:%S')
    print("Latest Report Check Time:", latest_rpt_check_time)

    for each in jl:
        strpd_time = dt.strptime(each['endTime'][0:19],'%Y-%m- 
        %dT%H:%M:%S')
        #print(strpd_time)
        pt_localize = pt.localize(strpd_time)
        #print(pt_localize)
        et_time = pt_localize.astimezone(et)
        #print(et_time)
        frmtd_et_time = et_time.strftime('%Y-%m-%d %H:%M:%S')
        #print(frmtd_et_time)
        cnvrted_endTime = dt.strptime(frmtd_et_time,'%Y-%m-%d %H:%M:%S')
        #print("Report LastRun EndTime:", cnvrted_endTime)
        ut_time = pt_localize.astimezone(ut)
        frmtd_ut_time = ut_time.strftime('%Y-%m-%d %H:%M:%S')
        
        if cnvrted_endTime > latest_rpt_check_time:
            reports_list.append({each['name']:each['exportUrls'][0] 
        ["url"]})
            rpt_last_run = each['name']+','+frmtd_ut_time
            print(rpt_last_run)
            with open(rprts_full_path, 'a') as f:
                f.write(rpt_last_run+'\n')
            retn_file_suffix = each['endTime'][0:10]
            #print(retn_file_suffix)
            rpt_run_hr = cnvrted_endTime.hour
            #print(rpt_run_hr)
            #############
            print(reports_list)

            for report in reports_list:
                for k in report:
            print(report[k])
            report_fetch_url = blackline_base_url + '/' + report[k]
            print('Report Fetch URL: {}'.format(report_fetch_url))
            filename = "temp_file.csv"
            full_path = os.path.join("/tmp",filename)
            rpt_data = requests.request("GET", report_fetch_url, headers 
            = blackline_rpt_api_headers)
            print(rpt_data.text)
            with open(full_path,'wb') as tmp_file:
                tmp_file.write(rpt_data.content)
            #Upload it to Cloud Storage
            landing_bucket_name = "####_dev_landing_bkt" #CHANGE ME
            source_file_name = os.path.join(full_path)
            rpt_last_run_file = os.path.join(rprts_full_path)
            landing_blob_name = '##.csv' #CHANGE ME
            retention_bucket_name = '####_dev_retention_bkt'
            print('file retention check')
            if (rpt_run_hr >= 22):
                file_retention_flag = 'Y'
            else:
                file_retention_flag = 'N'
               print(file_retention_flag)
               delete_and_upload_blob(landing_bucket_name, 
                    source_file_name, 
                    landing_blob_name,
                    retention_bucket_name,
                    file_retention_flag,
                    retn_file_suffix,
                    rpt_last_run_file)
        #Remove the temp file after it is uploaded to Cloud Storage to 
     avoid OOM issues with the Cloud Function. 
        os.remove(full_path)
    #Remove the tmp file after upload
    os.remove(rprts_full_path)

    #def pacific_to_eastern_conversion(pacific_time, eastern_time):
    def main(event,context):

    try:
        if 'data' in event:
        name = base64.b64decode(event['data']).decode('utf-8')
        else:
        name = 'World'
        print('Hello{}',format(name))   
       api_request()
       except Exception as e:
         logging.error(e)'    enter code here

标签: pythongoogle-cloud-platformgoogle-cloud-functions

解决方案


您使用的方法适用于 Cloud Run,但不适用于 Cloud 函数。

要在 Google Cloud 功能中使用 Secret,请执行以下步骤:

  1. 确保函数的运行时服务帐户必须被授予对 secret 的访问权限。要将 Secret Manager 与 Cloud Functions 一起使用,请将角色/secretmanager.secretAccessor 角色分配给与您的函数关联的服务帐户。

  2. 使函数可以访问秘密。这可以使用 Google Cloud Console 或 gcloud 命令行工具来完成。

我将秘密公开为环境变量(名称设置为“api_key”)并在代码中访问它们,如下所述:

import os

api_key = os.environ.get('api_key')

我希望这回答了你的问题。


推荐阅读