python - 如何使用 Python 自动将文件从 Google Cloud Storage 上传到 Big Query?
问题描述
我正在使用 python 代码片段并使用云功能部署它,目标是自动将 csv 数据从存储桶上传到大查询表,函数触发器是“每当在存储桶中上传新文件时”。但是,代码崩溃了,如果我做错了什么,请告诉我。
import gcsfs
import os
import pandas as pd
import re
import numpy as np
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.exceptions import NotFound
# Environment variables
metric = "availability"
table = "availability_daily_2"
bucket = "tintin_bucket"
staging_folder = "profitero/staging/daily/"+metric
processed_folder = "profitero/processed/daily/"+metric
dataset = "tintin_2"
# Returns a list with all blobs in a given bucket
def list_blobs(bucket):
storage_client = storage.Client()
blobs = storage_client.list_blobs(bucket)
blobs_list = []
for blob in blobs:
blobs_list.append(blob.name)
return(blobs_list)
# Function to process file names into organized data
def processFileNames(list_of_file_names):
# Define helper functions
def searchFunction(pattern,x):
output = re.search(pattern,x)
if output is None:
return(None)
else:
return(output.group(0))
def getdates(x): return(searchFunction(r"(\d{4}-\d{2}-\d{2})",x))
def getcountry(x): return(searchFunction(r"([A-Z]{2})",x))
def getmetric(x): return(searchFunction(r"(Placement|Pricing|Ratings|Availability|Content|Assortment)",x))
def getfiletype(x): return(searchFunction(r"(zip|csv)",x))
def isDaily(x): return(searchFunction(r"(Daily)",x))
# Create empty dataframe
d = {'filename': list_of_file_names}
df = pd.DataFrame(data=d)
# Fill dataframe
df['date'] = df.filename.apply(lambda x: getdates(x) )
df['date'] = pd.to_datetime(df['date'])
df['country'] = df.filename.apply(lambda x: getcountry(x) )
df['metric'] = df.filename.apply(lambda x: getmetric(x) )
df['filetype'] = df.filename.apply(lambda x: getfiletype(x) )
df['isDaily'] = df.filename.apply(lambda x: isDaily(x) )
df.replace('',np.nan,inplace=True)
#df.dropna(inplace=True)
return(df)
def cleanCols(x):
#x = re.sub('[^0-9a-zA-Z]+', '', x)
x = x.replace(" ", "_")
#x = x.lower()
x = x.replace("-","_")
x = x.replace("#","no")
x = x.replace("3p","third_party")
x = x.replace("3P","third_party")
x = x.replace("&","and")
x = x.replace("'","")
return(x)
# Function to move processed blobs into processed folder
def move_blob(bucket, file):
storage_client = storage.Client()
source_bucket = storage_client.bucket(bucket)
source_blob = source_bucket.blob(file)
destination_bucket = storage_client.bucket(bucket)
destination_blob_name = "profitero/processed/daily/"+metric+"/"+file.rsplit("/",1)[1]
try:
blob_copy = source_bucket.copy_blob(source_blob, destination_bucket, destination_blob_name)
blob_delete = source_bucket.delete_blob(file)
print("Blob {} moved to blob {}.".format(source_blob.name,blob_copy.name))
except NotFound:
print("Not found error")
pass
# Main function - Lists CSVs in bucket, reads them into memory, loads them into BigQuery
def csv_loader(data,context):
#request_json = request.get_json(silent=True)
print(data['name'])
p = re.compile('profitero\/staging\/daily\/'+metric+'\/.*csv')
if p.match(data['name']):
try:
df = pd.read_csv("gs://"+bucket+"/"+data['name'])
print("Read CSV")
df['event_id'] = context.event_id
print("Attached event id")
df['event_timestamp'] = context.timestamp
print("Attached timestamp")
df.rename(columns=lambda x: cleanCols(x),inplace=True)
df['RPC'] = df['RPC'].astype(str)
print("Cleaned column names")
df = df[['Date', 'Country', 'Retailer', 'Product_Title', 'Match_Type', 'Availability', 'URL', 'Manufacturer', 'Brand', 'Sub_Brand', 'Account_Category','RPC']]
print("Selected relevant columns")
df.to_gbq("tintin_2."+table,if_exists="append",project_id="emea-devices-services")
print("Added to table")
move_blob(bucket,data['name'])
print("Moved file")
except Exception as e:
print(e)
else:
pass
# Notify of sucess
return("Sucess!")
解决方案
您可以研究多种解决方案:
文件大小 < 50 MB: 将云存储中的文本文件 (.txt) 加载到大查询表中
文件大小 > 50 MB: 无法使用 Cloud Function 将 340 MB 文件加载到 BigQuery
注意:第一个解决方案利用 Cloud Function 的计算能力,而第二个解决方案利用 BigQuery 计算能力。
推荐阅读
- python - 在选择另一个小部件时更改 PyQt5 中的背景
- node.js - 具有不同列的多个文件的数据结构
- github - GH 使用远程存储库时是否可以在 DOM 中显示存储库?
- excel - 在 VBA 中创建状态栏或进度栏
- neo4j - NestJS / Neo4j 仅从我的数据库的一张表中读取所有信息
- ruby-on-rails - Figma 导出的 PNG 在集成到 rails 应用程序时质量下降
- javascript - 带有 jsGrid 的 ASPNETCORE 5.0(loadData 问题)
- python - 在 Geopandas 地图上绘制六角网格
- amazon-web-services - OSError:[Errno 28] AWS EC2 实例中的设备上没有剩余空间
- python-3.x - 如何告诉 Django 模型只有一个或其他字段不能为空?