首页 > 解决方案 > 如何使用 Python 自动将文件从 Google Cloud Storage 上传到 Big Query?

问题描述

我正在使用 python 代码片段并使用云功能部署它,目标是自动将 csv 数据从存储桶上传到大查询表,函数触发器是“每当在存储桶中上传新文件时”。但是,代码崩溃了,如果我做错了什么,请告诉我。

import gcsfs
import os
import pandas as pd
import re
import numpy as np
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.exceptions import NotFound


# Environment variables
metric = "availability"
table = "availability_daily_2" 

bucket = "tintin_bucket" 
staging_folder = "profitero/staging/daily/"+metric
processed_folder = "profitero/processed/daily/"+metric
dataset = "tintin_2"


# Returns a list with all blobs in a given bucket
def list_blobs(bucket):
    storage_client = storage.Client()
    blobs = storage_client.list_blobs(bucket)
    blobs_list = []
    for blob in blobs:
      blobs_list.append(blob.name)
    return(blobs_list)

# Function to process file names into organized data
def processFileNames(list_of_file_names):
    # Define helper functions
    def searchFunction(pattern,x): 
      output = re.search(pattern,x)
      if output is None:
        return(None)
      else:
        return(output.group(0))
    def getdates(x): return(searchFunction(r"(\d{4}-\d{2}-\d{2})",x))
    def getcountry(x): return(searchFunction(r"([A-Z]{2})",x))
    def getmetric(x): return(searchFunction(r"(Placement|Pricing|Ratings|Availability|Content|Assortment)",x)) 
    def getfiletype(x): return(searchFunction(r"(zip|csv)",x))
    def isDaily(x): return(searchFunction(r"(Daily)",x))
    # Create empty dataframe
    d = {'filename': list_of_file_names}
    df = pd.DataFrame(data=d)
    # Fill dataframe
    df['date'] = df.filename.apply(lambda x: getdates(x) )
    df['date'] = pd.to_datetime(df['date'])
    df['country'] = df.filename.apply(lambda x: getcountry(x) )
    df['metric'] = df.filename.apply(lambda x: getmetric(x) )
    df['filetype'] = df.filename.apply(lambda x: getfiletype(x) )
    df['isDaily'] = df.filename.apply(lambda x: isDaily(x) )
    df.replace('',np.nan,inplace=True)
    #df.dropna(inplace=True)
    return(df)

def cleanCols(x):
  #x = re.sub('[^0-9a-zA-Z]+', '', x)
  x = x.replace(" ", "_")
  #x = x.lower() 
  x = x.replace("-","_")
  x = x.replace("#","no")
  x = x.replace("3p","third_party")
  x = x.replace("3P","third_party")
  x = x.replace("&","and")
  x = x.replace("'","")
  return(x)

# Function to move processed blobs into processed folder
def move_blob(bucket, file):
    storage_client = storage.Client()
    source_bucket = storage_client.bucket(bucket)
    source_blob = source_bucket.blob(file)
    destination_bucket = storage_client.bucket(bucket)
    destination_blob_name = "profitero/processed/daily/"+metric+"/"+file.rsplit("/",1)[1]
    try:
        blob_copy = source_bucket.copy_blob(source_blob, destination_bucket, destination_blob_name)
        blob_delete = source_bucket.delete_blob(file)
        print("Blob {} moved to blob {}.".format(source_blob.name,blob_copy.name))
    except NotFound:
        print("Not found error")
        pass

# Main function - Lists CSVs in bucket, reads them into memory, loads them into BigQuery
def csv_loader(data,context):
    #request_json = request.get_json(silent=True)
    print(data['name'])
    p = re.compile('profitero\/staging\/daily\/'+metric+'\/.*csv')
    if p.match(data['name']):
        try: 
            df = pd.read_csv("gs://"+bucket+"/"+data['name'])
            print("Read CSV")
            df['event_id'] = context.event_id
            print("Attached event id")
            df['event_timestamp'] = context.timestamp
            print("Attached timestamp")
            df.rename(columns=lambda x: cleanCols(x),inplace=True)
            df['RPC'] = df['RPC'].astype(str)
            print("Cleaned column names")
            df = df[['Date', 'Country', 'Retailer', 'Product_Title', 'Match_Type', 'Availability', 'URL', 'Manufacturer', 'Brand', 'Sub_Brand', 'Account_Category','RPC']]
            print("Selected relevant columns")
            df.to_gbq("tintin_2."+table,if_exists="append",project_id="emea-devices-services")
            print("Added to table")
            move_blob(bucket,data['name'])
            print("Moved file")
        except Exception as e: 
            print(e)
    else:
        pass

    # Notify of sucess
    return("Sucess!")

标签: pythongoogle-cloud-platformgoogle-bigquerygoogle-cloud-functionsgoogle-cloud-storage

解决方案


您可以研究多种解决方案:

文件大小 < 50 MB: 将云存储中的文本文件 (.txt) 加载到大查询表中

文件大小 > 50 MB: 无法使用 Cloud Function 将 340 MB 文件加载到 BigQuery

注意:第一个解决方案利用 Cloud Function 的计算能力,而第二个解决方案利用 BigQuery 计算能力。


推荐阅读