首页 > 解决方案 > 如何从datalab中的pandas数据框将数据批量上传到bigquery表中

问题描述

我正在尝试在 bigquery 中已创建的表中插入新数据。处理数据并创建数据框后,我想在表中添加新行。我正在寻找一种有效的方法。

目前我正在做的是

def export_items_to_bigquery(ID_recipien,action_date_hour,monday, tuesday, wednesday, thursday, friday, saturday, sunday, click_action, read_action, converted,imedeate_response, weight, weight_mu, processing_date):
# Instantiates a client
  bigquery_client = bigquery.Client()
# Prepares a reference to the dataset
  dataset_ref = bigquery_client.dataset('internal')
  table_ref = dataset_ref.table('Processed_data')
  table = bigquery_client.get_table(table_ref)  # API call
  rows_to_insert = [(ID_recipien,action_date_hour,monday, tuesday, wednesday, thursday, friday, saturday, sunday, click_action, read_action, converted,imedeate_response, weight, weight_mu, processing_dat)]
  errors = bigquery_client.insert_rows(table, rows_to_insert)  # API request
#assert errors == []
  print(errors)

for i in range(len(out)): 
    hrs=int(out.iloc[i].hours)
    ID_recipient=int(out.iloc[i].ID_recipient)
    Last_Date=str(out.iloc[i].Last_Date)
    last_id_send=int(out.iloc[i].last_id_send)
    unique_send_id_action=int(out.iloc[i].unique_send_id_action)
    click_count=int(out.iloc[i].click_count)
    read_count=int(out.iloc[i].read_count)
    Monday=int(out.iloc[i].Monday)
    Tuesday=int(out.iloc[i].Tuesday)
    Wednesday=int(out.iloc[i].Wednesday)
    Thursday=int(out.iloc[i].Thursday)
    Friday=int(out.iloc[i].Friday)
    Saturday=int(out.iloc[i].Saturday)
    Sunday=int(out.iloc[i].Sunday)
    Total_day_active=int(out.iloc[i].Total_day_active)
    click_read_mulrtiplier=int(out.iloc[i].click_read_mulrtiplier)
    click_read_weight=int(out.iloc[i].click_read_weight)
    final_weight=int(out.iloc[i].final_weight)
    last_hour=int(out.iloc[i].last_hour)
    last_email=int(out.iloc[i].last_email)
    last_log_id=int(out.iloc[i].last_log_id)
    export_items_to_bigquery(hrs,ID_recipient,Last_Date, last_id_send, unique_send_id_action,click_count, read_count, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday, Total_day_active, click_read_mulrtiplier, click_read_weight, final_weight, last_hour, last_email,last_log_id)

这是非常低效的

我也试过

new_data.to_gbq('internal.intermediate_table_weight_calc', 'ids-internal', chunksize=10000, if_exists='append')

但它给出了请验证 DataFrame 中的结构和数据类型是否与目标表的架构匹配,即使所有名称都相同并且所有 dtype 都是 int。

标签: pythongoogle-bigquerygoogle-cloud-datalab

解决方案


t1=dt.datetime.now()
from google.cloud import bigquery
import os, json
def format_schema(schema):
    formatted_schema = []
    for row in schema:
        formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))   
    return formatted_schema

#json_data = new_data.head(4).to_json(orient = 'records')

### Additional parameter used to convert to newline delimited format
json_data = new_data.head(4).to_json(orient = 'records')
json_object = json.loads(json_data)
project_id = 'mailkit-internal'
dataset_id = 'internal'
table_id = 'intermediate_table_weight_calc'

client = bigquery.Client(project = project_id)
dataset = client.dataset(dataset_id)
table = dataset.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
#job_config.schema = format_schema(table_schema)
job = client.load_table_from_dataframe(df, table, job_config = job_config)
print(job.result())

这有效..希望它有所帮助..确保安装!pip install pyarrow!pip install fastparquet 和表名和类型匹配数据框和大查询表


推荐阅读