python - 使用 apache 气流将 Spark 作业结果加载到 BigQuery 时出错
问题描述
我想将我的火花作业结果加载到一个气流任务中,该任务是一个 json 文件,具有像 part-xxxxxxx.json 这样的火花格式
这是我的代码,
from datetime import datetime,timedelta , date
from airflow import models,DAG
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataProcPySparkOperator,DataprocClusterDeleteOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.operators import BashOperator
from airflow.models import *
from airflow.utils.trigger_rule import TriggerRule
current_date = str(date.today())
BUCKET = "gs://bigdata-etl-2_flights"
PROJECT_ID = "bigdata-etl-2"
PYSPARK_JOB = BUCKET + "/spark-job/spark-etl-job-1.py"
DEFAULT_DAG_ARGS = {
'owner':"airflow",
'depends_on_past' : False,
"start_date":datetime.utcnow(),
"email_on_failure":False,
"email_on_retry":False,
"retries": 1,
"retry_delay":timedelta(minutes=5),
"project_id":PROJECT_ID,
"scheduled_interval":"0 5 * * *"
}
with DAG("flights_etl_testing",default_args=DEFAULT_DAG_ARGS) as dag :
create_cluster = DataprocClusterCreateOperator(
task_id ="create_dataproc_cluster",
cluster_name="ephemeral-spark-cluster-{{ds_nodash}}",
master_machine_type="n1-standard-1",
worker_machine_type="n1-standard-2",
num_workers=2,
region="asia-east1",
zone ="asia-east1-a"
)
submit_pyspark = DataProcPySparkOperator(
task_id = "run_pyspark_etl",
main = PYSPARK_JOB,
cluster_name="ephemeral-spark-cluster-{{ds_nodash}}",
region="asia-east1"
)
bq_load_delays_by_distance = GoogleCloudStorageToBigQueryOperator(
task_id = "bq_load_avg_delays_by_distance",
bucket=BUCKET,
source_objects=["flights_data_output/"+current_date+"_distance_category/*.json"],
destination_project_dataset_table= f'{PROJECT_ID}:data_flights.avg_delays_by_distance',
autodetect = True,
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
skip_leading_rows=0,
write_disposition="WRITE_APPEND",
max_bad_records=0,
)
bq_load_delays_by_flight_nums = GoogleCloudStorageToBigQueryOperator(
task_id = "bq_load_delays_by_flight_nums",
bucket=BUCKET,
source_objects=["flights_data_output/"+current_date+"_flight_nums/*.json"],
destination_project_dataset_table=f'{PROJECT_ID}:data_flights.avg_delays_by_flight_nums',
autodetect = True,
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
skip_leading_rows=0,
write_disposition="WRITE_APPEND",
max_bad_records=0,
)
delete_cluster = DataprocClusterDeleteOperator(
task_id ="delete_dataproc_cluster",
cluster_name="ephemeral-spark-cluster-{{ds_nodash}}",
region="asia-east1",
trigger_rule = TriggerRule.ALL_DONE
)
delete_tranformed_files = BashOperator(
task_id = "delete_tranformed_files",
bash_command = "gsutil -m rm -r " +BUCKET + "/flights_data_output/*"
)
create_cluster.dag = dag
create_cluster.set_downstream(submit_pyspark)
submit_pyspark.set_downstream(bq_load_delays_by_distance)
bq_load_delays_by_distance.set_downstream(bq_load_delays_by_flight_nums)
bq_load_delays_by_flight_nums.set_downstream(delete_cluster)
delete_cluster.set_downstream(delete_tranformed_files)
任务 1 = create_cluster 已完成 任务 2 = 提交 spark 作业已完成
但在任务 3 中,我得到了这样的错误:
*** Reading remote log from gs://asia-east2-airflow-flight-j-442c4eb8-bucket/logs/flights_etl_testing/bq_load_avg_delays_by_distance/2021-03-20T17:25:55.218034+00:00/2.log.
[2021-03-20 17:36:06,559] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: flights_etl_testing.bq_load_avg_delays_by_distance 2021-03-20T17:25:55.218034+00:00 [queued]>
[2021-03-20 17:36:06,945] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: flights_etl_testing.bq_load_avg_delays_by_distance 2021-03-20T17:25:55.218034+00:00 [queued]>
[2021-03-20 17:36:06,946] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-03-20 17:36:06,946] {taskinstance.py:882} INFO - Starting attempt 2 of 2
[2021-03-20 17:36:06,946] {taskinstance.py:883} INFO -
--------------------------------------------------------------------------------
[2021-03-20 17:36:06,980] {taskinstance.py:902} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): bq_load_avg_delays_by_distance> on 2021-03-20T17:25:55.218034+00:00
[2021-03-20 17:36:06,985] {standard_task_runner.py:54} INFO - Started process 3724 to run task
[2021-03-20 17:36:07,074] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'flights_etl_testing', 'bq_load_avg_delays_by_distance', '2021-03-20T17:25:55.218034+00:00', '--job_id', '109', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/spark-bq-dag.py', '--cfg_path', '/tmp/tmpnsb9j5cm']
[2021-03-20 17:36:07,082] {standard_task_runner.py:78} INFO - Job 109: Subtask bq_load_avg_delays_by_distance
[2021-03-20 17:36:07,597] {logging_mixin.py:112} INFO - Running <TaskInstance: flights_etl_testing.bq_load_avg_delays_by_distance 2021-03-20T17:25:55.218034+00:00 [running]> on host airflow-worker-c5555fddc-vmtwq
[2021-03-20 17:36:08,126] {gcp_api_base_hook.py:145} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.@-@{"workflow": "flights_etl_testing", "task-id": "bq_load_avg_delays_by_distance", "execution-date": "2021-03-20T17:25:55.218034+00:00"}
[2021-03-20 17:36:08,505] {taskinstance.py:1152} ERROR - <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/bigdata-etl-2/jobs?alt=json returned "Source URI must be a Google Cloud Storage location: gs://gs://bigdata-etl-2_flights/flights_data_output/2021-03-20_distance_category/*.json". Details: "Source URI must be a Google Cloud Storage location: gs://gs://bigdata-etl-2_flights/flights_data_output/2021-03-20_distance_category/*.json">
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/gcs_to_bq.py", line 288, in execut
encryption_configuration=self.encryption_configuration
File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1302, in run_loa
return self.run_with_configuration(configuration
File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1323, in run_with_configuratio
.execute(num_retries=self.num_retries
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrappe
return wrapped(*args, **kwargs
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 915, in execut
raise HttpError(resp, content, uri=self.uri
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/bigdata-etl-2/jobs?alt=json returned "Source URI must be a Google Cloud Storage location: gs://gs://bigdata-etl-2_flights/flights_data_output/2021-03-20_distance_category/*.json". Details: "Source URI must be a Google Cloud Storage location: gs://gs://bigdata-etl-2_flights/flights_data_output/2021-03-20_distance_category/*.json"
[2021-03-20 17:36:08,509] {taskinstance.py:1196} INFO - Marking task as FAILED. dag_id=flights_etl_testing, task_id=bq_load_avg_delays_by_distance, execution_date=20210320T172555, start_date=20210320T173606, end_date=20210320T173608
[2021-03-20 17:36:12,827] {local_task_job.py:102} INFO - Task exited with return code 1
我期待任务 3 将创建一个 bigquery 表作为 avg_delays_by_distance 和 avg_delays_by_flight_nums
请帮助并解释为什么我在运行任务 3 时遇到这样的错误
解决方案
推荐阅读
- terraform - terraform 是否支持 AWS BACKUP 还原功能?
- c# - 当前线程必须设置为单线程单元 (STA) 模式错误 DataGridView
- python - skimage.morphology 中的深度学习语法错误
- python - 将双端队列值转换为索引列表的最佳方法?
- reactjs - 如何使用 onClick 获取要为组件加载的路由路径?
- linux - 与 SIGTERM 一起传递其他信息
- html - 如何使用 Flexbox 进行网格布局
- java - 我如何一次有效地更新一个类的数千个实例?
- c# - 自动售货机中的 3 层架构,同时保持 OCP
- java - 有没有办法使用 itext 库清理扫描的 pdf 的矩形区域?