python - Need help running spark-submit in Apache Airflow
问题描述
I am a relatively new user to Python and Airflow and am having a very difficult time getting spark-submit
to run in an Airflow task. My goal is to get the following DAG task to run successfully
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'matthew',
'start_date': datetime(2019, 7, 8)
}
dag = DAG('CustomCreate_test2',
default_args=default_args,
schedule_interval=timedelta(days=1))
t3 = BashOperator(
task_id='run_test',
bash_command='spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar',
dag=dag
)
I know the problem lies with Airflow and not with the bash because when I run the command spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar
in the terminal it runs successfully.
I have been getting the following error from the Airflow logs
...
[2019-08-28 15:55:34,750] {bash_operator.py:132} INFO - Command exited with return code 1
[2019-08-28 15:55:34,764] {taskinstance.py:1047} ERROR - Bash command failed
Traceback (most recent call last):
File "/Users/matcordo2/.virtualenv/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 922, in _run_raw_task
result = task_copy.execute(context=context)
File "/Users/matcordo2/.virtualenv/airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 136, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
...
I have also tried working with the SparkSubmitOperator(...)
but have had no successful runs using it, I have only ever ended up with error logs like the following
...
[2019-08-28 15:54:49,749] {logging_mixin.py:95} INFO - [[34m2019-08-28 15:54:49,749[0m] {[34mspark_submit_hook.py:[0m427} INFO[0m - at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)[0m
[2019-08-28 15:54:49,803] {taskinstance.py:1047} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--num-executors', '2', '--total-executor-cores', '1', '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', '--name', 'CustomCreate', '--class', 'CLASSPATH.CustomCreate', '--verbose', '--queue', 'root.default', '--deploy-mode', 'cluster', '~/IdeaProjects/custom-create-job/build/libs/custom-create.jar']. Error code is: 1.
...
Is there something I have to do using SparkSubmitOperator(...)
before I can run the spark-submit ...
command in a BashOperator(...)
task?
Is there a way to run my spark-submit
command directly from the SparkSubmitOperator(...)
task?
Is there anything that I have to do to spark_default
in the Admin->Connections page of Airflow?
Is there anything that must be set in the Admin->Users page of Airflow? Is there anything that must be set to allow Airflow to run spark or run a jar file created by a specific user? If so, what/how?
解决方案
我找到了解决此问题的解决方法。
在 Airflow Admin->Connection页面 Airflow SSH Connection Example中创建一个新的 ssh 连接(或编辑默认连接),如下所示
如果您看不到图像,以下是文本版本
Conn ID:ssh_connection
Conn Type:SSH
Host:HOST IP ADDRESS
Username:HOST USERNAME
Password:HOST PASSWORD
Port:
Extra:{“key_file”:“/PATH TO HOME DIR/airflow/ .ssh/id_rsa", "allow_host_key_change": "true", "no_host_key_check": "true"}
然后对你的python脚本进行适当的调整
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'matthew',
'start_date': datetime(2019, 8, 28)
}
dag = DAG('custom-create',
default_args=default_args,
schedule_interval=timedelta(days=1),
params={'project_source': '~/IdeaProjects/custom-create-job',
'spark_submit': '/usr/local/bin/spark-submit',
'classpath': 'CLASSPATH.CustomCreate',
'jar_file': 'build/libs/custom-create.jar'}
)
templated_bash_command = """
echo 'HOSTNAME: $HOSTNAME' #To check that you are properly connected to the host
cd {{ params.project_source }}
{{ params.spark_submit }} --class {{ classpath }} {{ jar_file }}
"""
t1 = SSHOperator(
task_id="SSH_task",
ssh_conn_id='ssh_connection',
command=templated_bash_command,
dag=dag
)
我希望这个解决方案可以帮助其他可能遇到与我类似的问题的人。
推荐阅读
- c - 环境/宏到 C make 文件的命令行
- r - 如何在闪亮中编辑表列名称并保存表以供进一步分析?
- c++ - 添加元素后如何停止添加到 std::vector
- html - 在 flex-contianer 内连续对齐 flex 项目的问题
- java - (Spring Boot,JpaRepository,Postgresql)为什么在运行时没有实例化存储库接口?
- macos - 在不同的屏幕尺寸上裁剪屏幕坐标不正确的 NSImage
- kdb - kdb:这个命令行提示qQ是什么意思)
- flutter - 如何在飞镖颤动中转换动态列表
- javascript - 尝试将 unix 时间戳转换为日期,但正在打印相同的天数。使用反应
- java - 定时器延迟太少时对象消失