docker - 基于 Docker 的解决方案中的 Apache Spark 和 Apache Airflow 连接
问题描述
我有 Spark 和 Airflow 集群,我想将 Spark 作业从 Airflow 容器发送到 Spark 容器。但我是 Airflow 的新手,我不知道我需要执行哪种配置。我spark_submit_operator.py
在插件文件夹下复制。
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
args = {
'owner': 'airflow',
'start_date': datetime(2018, 7, 31)
}
dag = DAG('spark_example_new', default_args=args, schedule_interval="*/10 * * * *")
operator = SparkSubmitOperator(
task_id='spark_submit_job',
conn_id='spark_default',
java_class='Simple',
application='/spark/abc.jar',
total_executor_cores='1',
executor_cores='1',
executor_memory='2g',
num_executors='1',
name='airflow-spark-example',
verbose=False,
driver_memory='1g',
application_args=["1000"],
conf={'master':'spark://master:7077'},
dag=dag,
)
master是我们 Spark Master 容器的主机名。当我运行 dag 时,它会产生以下错误:
[2018-09-20 05:57:46,637] {{models.py:1569}} INFO - Executing <Task(SparkSubmitOperator): spark_submit_job> on 2018-09-20T05:57:36.756154+00:00
[2018-09-20 05:57:46,637] {{base_task_runner.py:124}} INFO - Running: ['bash', '-c', 'airflow run spark_example_new spark_submit_job 2018-09-20T05:57:36.756154+00:00 --job_id 4 --raw -sd DAGS_FOLDER/firstJob.py --cfg_path /tmp/tmpn2hznb5_']
[2018-09-20 05:57:47,002] {{base_task_runner.py:107}} INFO - Job 4: Subtask spark_submit_job [2018-09-20 05:57:47,001] {{settings.py:174}} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2018-09-20 05:57:47,312] {{base_task_runner.py:107}} INFO - Job 4: Subtask spark_submit_job [2018-09-20 05:57:47,311] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2018-09-20 05:57:47,428] {{base_task_runner.py:107}} INFO - Job 4: Subtask spark_submit_job [2018-09-20 05:57:47,428] {{models.py:258}} INFO - Filling up the DagBag from /usr/local/airflow/dags/firstJob.py
[2018-09-20 05:57:47,447] {{base_task_runner.py:107}} INFO - Job 4: Subtask spark_submit_job [2018-09-20 05:57:47,447] {{cli.py:492}} INFO - Running <TaskInstance: spark_example_new.spark_submit_job 2018-09-20T05:57:36.756154+00:00 [running]> on host e6dd59dc595f
[2018-09-20 05:57:47,471] {{logging_mixin.py:95}} INFO - [2018-09-20 05:57:47,470] {{spark_submit_hook.py:283}} INFO - Spark-Submit cmd: ['spark-submit', '--master', 'yarn', '--conf', 'master=spark://master:7077', '--num-executors', '1', '--total-executor-cores', '1', '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', '--name', 'airflow-spark-example', '--class', 'Simple', '/spark/ugur.jar', '1000']
[2018-09-20 05:57:47,473] {{models.py:1736}} ERROR - [Errno 2] No such file or directory: 'spark-submit': 'spark-submit'
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1633, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/spark_submit_operator.py", line 168, in execute
self._hook.submit(self._application)
File "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/spark_submit_hook.py", line 330, in submit
**kwargs)
File "/usr/local/lib/python3.6/subprocess.py", line 709, in __init__
restore_signals, start_new_session)
File "/usr/local/lib/python3.6/subprocess.py", line 1344, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'spark-submit': 'spark-submit'
它正在运行命令:
Spark-Submit cmd: ['spark-submit', '--master', 'yarn', '--conf', 'master=spark://master:7077', '--num-executors', '1', '--total-executor-cores', '1', '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', '--name', 'airflow-spark-example', '--class', 'Simple', '/spark/ugur.jar', '1000']
但我没有使用纱线。
解决方案
如果您使用 SparkSubmitOperator,则默认情况下与 master 的连接将设置为“Yarn”,无论您在 python 代码上设置哪个 master,但是,您可以通过其构造函数指定 conn_id 来覆盖 master,条件是您已经conn_id
在“ Airflow Web 界面中的“管理”->“连接”菜单。我希望它可以帮助你。
推荐阅读
- c++ - 引用基类的私有继承方法时的歧义
- python - 我的机器人缺少权限模块不工作 discord.py
- linux - llvm_pipe 线程是什么?
- python - 解决python ValueError: max_workers must be <= 61 when running pre-commit?
- amazon-s3 - 从 S3 中的多个 zip 文件加载雅典娜表的问题
- android - Flutter http get api failed,可能是header样式错误
- macos - 关于在 Mac 上设置 Flutter SDK 路径
- python - 检索 Python DataFrame 中的平均值
- php - 有没有办法让 woocommerce_loop_add_to_cart_link 不更改 url 地址但仍将产品添加到购物车?
- amazon-web-services - 使用命令行编辑 yml 文件