首页 > 解决方案 > Airflow 中的多个 BashOperator 无法识别当前文件夹

问题描述

我正在使用 Airflow 来查看是否可以为我的数据摄取做同样的工作,原始摄取通过 shell 中的两个步骤完成:

  1. 光盘 ~/bm3
  2. ./bm3.py runjob -p projectid -j jobid

在 Airflow 中,我对 BashOperator 有两个任务:

task1 = BashOperator(
    task_id='switch2BMhome',
    bash_command="cd /home/pchoix/bm3",
    dag=dag)

task2 = BashOperator(
    task_id='kickoff_bm3',
    bash_command="./bm3.py runjob -p client1 -j ingestion",
    dag=dag)

task1 >> task2

task1 按预期完成,日志如下:

[2019-03-01 16:50:17,638] {bash_operator.py:100} INFO - Temporary script location: /tmp/airflowtmpkla8w_xd/switch2ALhomeelbcfbxb
[2019-03-01 16:50:17,638] {bash_operator.py:110} INFO - Running command: cd /home/rxie/al2

由于日志中显示的原因,task2 失败:

[2019-03-01 16:51:19,896] {bash_operator.py:100} INFO - Temporary script location: /tmp/airflowtmp328cvywu/kickoff_al2710f17lm
[2019-03-01 16:51:19,896] {bash_operator.py:110} INFO - Running command: ./bm32.py runjob -p client1 -j ingestion
[2019-03-01 16:51:19,902] {bash_operator.py:119} INFO - Output:
[2019-03-01 16:51:19,903] {bash_operator.py:123} INFO - /tmp/airflowtmp328cvywu/kickoff_al2710f17lm: line 1: ./bm3.py: No such file or directory

因此,似乎每个任务都是从一个看似唯一的临时文件夹执行的,但第二个任务失败了。

如何从特定位置运行 bash 命令?

如果您可以在这里分享任何想法,我们将不胜感激。

非常感谢。

更新:感谢您几乎可行的建议。

首先bash_command="cd /home/pchoix/bm3 && ./bm3.py runjob -p client1 -j ingestion",工作正常,但是其中runjob有多个任务,第一个任务有效,第二个任务调用 impala-shell.py 运行某些东西, impala-shell.py 将 python2 指定为其解释器语言,而在它之外,其他部分正在使用 python 3。

当我只是在 shell 中运行 bash_command 时,这是可以的,但是在 Airflow 中,由于未知原因,尽管我设置了正确的 PATH 并确保在 shell 中:

(base) (venv) [pchoix@hadoop02 ~]$ python
Python 2.6.6 (r266:84292, Jan 22 2014, 09:42:36)

任务仍然在python 3内执行,使用python 3,从日志中可以看出:

[2019-03-01 21:42:08,040] {bash_operator.py:123} INFO -   File "/data/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/bin/../lib/impala-shell/impala_shell.py", line 220
[2019-03-01 21:42:08,040] {bash_operator.py:123} INFO -     print '\tNo options available.'
[2019-03-01 21:42:08,040] {bash_operator.py:123} INFO -                                   ^
[2019-03-01 21:42:08,040] {bash_operator.py:123} INFO - SyntaxError: Missing parentheses in call to 'print'

请注意,当我在 shell 环境中运行作业时,此问题不存在:

./bm3.py runjob -p client1 -j ingestion

标签: airflow

解决方案


怎么样:

task = BashOperator(
    task_id='switch2BMhome',
    bash_command="cd /home/pchoix/bm3 && ./bm3.py runjob -p client1 -j ingestion",
    dag=dag)

推荐阅读