首页 > 解决方案 > 无法通过 Composer 中的 BigQueryOperator 将数据加载到大查询表中

问题描述

我创建了 dag 以将数据从大查询加载到另一个大查询表。我在 composer 中使用了 BigQueryOperator。但是这段代码没有按预期工作。我无法得到错误,任何人都可以帮我解决这个问题。

而且我手动创建了空表,仍然没有加载到表中的数据。请找到下面的代码,让我知道我错过了什么吗?

from typing import Any
from datetime import datetime, timedelta
import airflow
from airflow import models
from airflow.operators import bash_operator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

sql="""SELECT * FROM `project_id.dataset_name.source_table`"""

DEFAULT_ARGUMENTS = {
    "owner": "Airflow",
    "depends_on_past": False,
    "start_date": datetime(2019, 8, 7),
    "schedule_interval": '0 6 * * *',
    "retries": 10
}

dag = models.DAG(
        dag_id='Bq_to_bq',
        default_args=DEFAULT_ARGUMENTS
    )
LOAD_TABLE_TRUNC = BigQueryOperator(
    task_id ='load_bq_table_truncate',
    dag=dag,
    bql=sql,
    destination_proect_dataset_table='project-id.dataset-name.table_name',
    write_disposition='WRITE_TRUNCATE',
    create_disposition='CREATE_IF_NEEDED',
    allow_large_results='true',
    use_legacy_sql=False,

)

LOAD_TABLE_APPEND = BigQueryOperator(
    task_id ='load_bq_table_append',
    dag=dag,
    bql=sql,
    destination_proect_dataset_table='project-id.dataset-name.table_name',
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_IF_NEEDED',
    allow_large_results='true',
    use_legacy_sql=False,

)
LOAD_TABLE_TRUNC.set_downstream(LOAD_TABLE_APPEND)

标签: google-bigquerygoogle-cloud-composer

解决方案


这是为了找出特定于 DAG 失败的错误

您可以通过两种方式找出错误

  • 网页界面:

    1. 转到 DAG 并选择图形视图。
    2. 选择任务并单击查看日志。
  • 堆栈驱动程序日志记录:

    1. 转到此 URL https://console.cloud.google.com/logs/viewer?项目 = 项目 ID。
    2. 从第一个下拉列表中选择“Cloud Composer Environment”,然后选择位置和 DAG 名称。
    3. 从日志级别下拉列表中选择错误为。

推荐阅读