python - 如何从datalab中的pandas数据框将数据批量上传到bigquery表中
问题描述
我正在尝试在 bigquery 中已创建的表中插入新数据。处理数据并创建数据框后,我想在表中添加新行。我正在寻找一种有效的方法。
目前我正在做的是
def export_items_to_bigquery(ID_recipien,action_date_hour,monday, tuesday, wednesday, thursday, friday, saturday, sunday, click_action, read_action, converted,imedeate_response, weight, weight_mu, processing_date):
# Instantiates a client
bigquery_client = bigquery.Client()
# Prepares a reference to the dataset
dataset_ref = bigquery_client.dataset('internal')
table_ref = dataset_ref.table('Processed_data')
table = bigquery_client.get_table(table_ref) # API call
rows_to_insert = [(ID_recipien,action_date_hour,monday, tuesday, wednesday, thursday, friday, saturday, sunday, click_action, read_action, converted,imedeate_response, weight, weight_mu, processing_dat)]
errors = bigquery_client.insert_rows(table, rows_to_insert) # API request
#assert errors == []
print(errors)
for i in range(len(out)):
hrs=int(out.iloc[i].hours)
ID_recipient=int(out.iloc[i].ID_recipient)
Last_Date=str(out.iloc[i].Last_Date)
last_id_send=int(out.iloc[i].last_id_send)
unique_send_id_action=int(out.iloc[i].unique_send_id_action)
click_count=int(out.iloc[i].click_count)
read_count=int(out.iloc[i].read_count)
Monday=int(out.iloc[i].Monday)
Tuesday=int(out.iloc[i].Tuesday)
Wednesday=int(out.iloc[i].Wednesday)
Thursday=int(out.iloc[i].Thursday)
Friday=int(out.iloc[i].Friday)
Saturday=int(out.iloc[i].Saturday)
Sunday=int(out.iloc[i].Sunday)
Total_day_active=int(out.iloc[i].Total_day_active)
click_read_mulrtiplier=int(out.iloc[i].click_read_mulrtiplier)
click_read_weight=int(out.iloc[i].click_read_weight)
final_weight=int(out.iloc[i].final_weight)
last_hour=int(out.iloc[i].last_hour)
last_email=int(out.iloc[i].last_email)
last_log_id=int(out.iloc[i].last_log_id)
export_items_to_bigquery(hrs,ID_recipient,Last_Date, last_id_send, unique_send_id_action,click_count, read_count, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday, Total_day_active, click_read_mulrtiplier, click_read_weight, final_weight, last_hour, last_email,last_log_id)
这是非常低效的
我也试过
new_data.to_gbq('internal.intermediate_table_weight_calc', 'ids-internal', chunksize=10000, if_exists='append')
但它给出了请验证 DataFrame 中的结构和数据类型是否与目标表的架构匹配,即使所有名称都相同并且所有 dtype 都是 int。
解决方案
t1=dt.datetime.now()
from google.cloud import bigquery
import os, json
def format_schema(schema):
formatted_schema = []
for row in schema:
formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))
return formatted_schema
#json_data = new_data.head(4).to_json(orient = 'records')
### Additional parameter used to convert to newline delimited format
json_data = new_data.head(4).to_json(orient = 'records')
json_object = json.loads(json_data)
project_id = 'mailkit-internal'
dataset_id = 'internal'
table_id = 'intermediate_table_weight_calc'
client = bigquery.Client(project = project_id)
dataset = client.dataset(dataset_id)
table = dataset.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
#job_config.schema = format_schema(table_schema)
job = client.load_table_from_dataframe(df, table, job_config = job_config)
print(job.result())
这有效..希望它有所帮助..确保安装!pip install pyarrow!pip install fastparquet 和表名和类型匹配数据框和大查询表
推荐阅读
- mysql - 在本地设置 express,mysql 应用程序并且工作正常,但是如何在 AWS ubuntu 服务器中设置它相同的应用程序
- angular7 - Angular 7 HTTP 拦截器不工作
- javascript - 未找到没有参数的“export2”反向
- apache-spark - 如何将数据帧传递给 spark udf?
- enums - Alloy 中的赋值和 Enum 的使用
- c++ - 将大字符串作为右值传递时的 std::move 性能?
- excel - 特定周数内的 Excel 日期计算
- windows - 命令提示符下的 perl 文件关联不起作用
- php - 我们如何在 laravel 中发送相应的电子邮件验证链接 admin、vendor 和 user auth,
- hadoop - org.apache.ignite.IgniteException:对于输入字符串:“30s”在 ignite hadoop 执行中