google-bigquery - 无法通过 Composer 中的 BigQueryOperator 将数据加载到大查询表中
问题描述
我创建了 dag 以将数据从大查询加载到另一个大查询表。我在 composer 中使用了 BigQueryOperator。但是这段代码没有按预期工作。我无法得到错误,任何人都可以帮我解决这个问题。
而且我手动创建了空表,仍然没有加载到表中的数据。请找到下面的代码,让我知道我错过了什么吗?
from typing import Any
from datetime import datetime, timedelta
import airflow
from airflow import models
from airflow.operators import bash_operator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
sql="""SELECT * FROM `project_id.dataset_name.source_table`"""
DEFAULT_ARGUMENTS = {
"owner": "Airflow",
"depends_on_past": False,
"start_date": datetime(2019, 8, 7),
"schedule_interval": '0 6 * * *',
"retries": 10
}
dag = models.DAG(
dag_id='Bq_to_bq',
default_args=DEFAULT_ARGUMENTS
)
LOAD_TABLE_TRUNC = BigQueryOperator(
task_id ='load_bq_table_truncate',
dag=dag,
bql=sql,
destination_proect_dataset_table='project-id.dataset-name.table_name',
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
allow_large_results='true',
use_legacy_sql=False,
)
LOAD_TABLE_APPEND = BigQueryOperator(
task_id ='load_bq_table_append',
dag=dag,
bql=sql,
destination_proect_dataset_table='project-id.dataset-name.table_name',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED',
allow_large_results='true',
use_legacy_sql=False,
)
LOAD_TABLE_TRUNC.set_downstream(LOAD_TABLE_APPEND)
解决方案
这是为了找出特定于 DAG 失败的错误
您可以通过两种方式找出错误
网页界面:
- 转到 DAG 并选择图形视图。
- 选择任务并单击查看日志。
堆栈驱动程序日志记录:
- 转到此 URL https://console.cloud.google.com/logs/viewer?项目 = 项目 ID。
- 从第一个下拉列表中选择“Cloud Composer Environment”,然后选择位置和 DAG 名称。
- 从日志级别下拉列表中选择错误为。
推荐阅读
- java - 我怎么会得到这个错误outofboundsException?
- react-native - 顶部选项卡导航 wix 反应本机导航应用程序正在崩溃
- c - C上的尾递归双因子 - 整数除以零
- react-native - 使用未定义状态响应本机 PanResponder 函数调用
- laravel - Chrome 无法打开 localhost:3000 BrowserSync
- flutter - 如何在卡片内添加 PopupMenuButton
- c - 如何从循环单链表中删除一个节点
- node.js - 使用打字稿节点js express返回具有接口函数类型的对象
- python - Python - 此函数中的变量如何通过字典进行?
- python - 如何使用单个 shell 实例在 Python 中运行多个命令?(避免shell启动时间)