linux - 从 Airflow 作业执行 SparkSubmitOperator 时出错
问题描述
背景:我创建了一个新的 Airflow Job/Task DAG,我在其中使用了 SparkSubmitOperator。我在我的桌面上同时运行 Spark 和 Airflow(版本等如下)。DAG 工作正常,直到它到达 Spark 作业的提交部分。我尝试使用以下选项更改连接。无论我尝试什么,我都会在 Airflow 日志中收到以下消息。
Airflow 识别连接并尝试使用它但失败了。
如果我从命令提示符提交目标 DataPipelineExample.py,它将毫无问题地运行。
问题:是什么阻止了 Airflow 识别并使用连接来触发本地以执行 spark-submit?
Airflow.exceptions.AirflowException:无法执行:spark-submit --master http://localhost:4040 --name mySparkSubmitJob
桌面:Linux Mint VERSION="19.3 (Tricia)" Spark:版本 2.4.5 Pyspark:版本 2.4.5 Airflow:版本:1.10.9 Python 3.7.4(默认,2019 年 8 月 13 日,20:35:49)java 版本“1.8.0_241”
使用或尝试过的气流连接 localhost 4040 spark://localhost 4040 http://localhost:4040 http://specific ip address:4040 Host: localhost Port: 4040 / Extras, No Extras etc. Extras : { "root.default" ,:“spark_home”:“”,“spark_binary”:“spark-submit”,“命名空间”:“默认”}
路径信息
export SCALA_HOME=~/anaconda3/share/scala-2.11.1
export SPARK_HOME=/usr/local/spark
export PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"
export PATH=$PATH:/usr/local/spark/bin
低于完整的 DAG。这编译并被 Python 和 Airflow 完全识别。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
default_args = {
'owner': 'me@gmail.com',
'depends_on_past': False,
'start_date': datetime(2020, 3, 17),
'email': ['me@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
'end_date': datetime(2030, 3, 17),
}
dag = DAG(dag_id = 'a_data_pipelne_job', default_args=default_args, schedule_interval='*/45 * * * *')
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
spark_submit_task = SparkSubmitOperator(
task_id='spark_submit_job_02',
conn_id='spark_local',
application = "/home/me/.config/spyder-py3/DataPipelineExample.py",
name='airflowspark-DataLoaderMongo',
verbose=True,
dag=dag,
)
t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)
解决方案
我可以通过使用 SSHOperator 来解决这个问题。它比 SparkSubmitOperator 更不容易受到环境配置问题的影响。SparkSubmit 在本地 pyspark home 的上下文中通过 SSH 调用。为你的 python 脚本添加路径参数,你就可以开始了。
dag = DAG(dag_id = 'a_pjm_data_pipelne__ssh_job',
default_args=default_args,
schedule_interval='*/60 * * * *',
params={'project_source': '/home/me/.config/spyder-py3',
'spark_submit': '/usr/local/spark/bin/spark-submit DataPipelineExample.py'})
templated_bash_command = """
echo 'HOSTNAME: localhost' #To check that you are properly connected to the host
cd {{ params.project_source }}
{{ params.spark_submit }}
"""
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
submit_spark_task = SSHOperator(
task_id="SSH_task",
ssh_conn_id='ssh_default',
command=templated_bash_command,
dag=dag
推荐阅读
- node.js - 选择端口以运行本地角度和节点应用程序的最佳实践?
- c# - 在工具中显示俄语文本的问题。(UTF8)
- excel - 在Excel中查找一行中最高的单元格值并返回相邻的单元格值
- cakephp - CakePHP3 中的条纹实现
- r - 在 ggplot2 中使用自由 y 轴重新定位 face_wrapped 图中的轴标签
- java - 无法在适配器中使用包 - Java Android Studio
- html - 如何在另一个 Ul 框中显示一个 UL 框以及中心的文本?
- unity3d - 无法从脚本构建 Unity WebGL 项目
- android - Android 测试编排器:com.android.ddmlib.InstallException:-26
- ibm-midrange - 在 QShell AS/400 (iSeries) 中表示字节