首页 > 解决方案 > 气流调度程序无法通过 WSL 执行 Windows EXE

问题描述

我的 Windows 10 机器在 WSL 2 (Ubuntu-20.04) 中安装了 Airflow 1.10.11。

我有一个 BashOperator 任务,它在 Windows 上调用 .EXE(通过 /mnt/c/... 或通过符号链接)。任务失败。日志显示:

[2020-12-16 18:34:11,833] {bash_operator.py:134} INFO - Temporary script location: /tmp/airflowtmp2gz6d79p/download.legacyFilesnihvszli
[2020-12-16 18:34:11,833] {bash_operator.py:146} INFO - Running command: /mnt/c/Windows/py.exe
[2020-12-16 18:34:11,836] {bash_operator.py:153} INFO - Output:
[2020-12-16 18:34:11,840] {bash_operator.py:159} INFO - Command exited with return code 1
[2020-12-16 18:34:11,843] {taskinstance.py:1150} ERROR - Bash command failed
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.8/dist-packages/airflow/operators/bash_operator.py", line 165, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2020-12-16 18:34:11,844] {taskinstance.py:1187} INFO - Marking task as FAILED. dag_id=test-dag, task_id=download.files, execution_date=20201216T043701, start_date=20201216T073411, end_date=20201216T073411

就是这样。返回1没有更多有用信息的代码。

通过 bash 运行相同的 EXE 可以完美运行,没有错误(我也在我自己的程序上尝试过它,它向控制台发出一些东西 - 在 bash 中它发出的东西很好,但通过airflow scheduler它是相同的错误 1)。

我为排除任何其他问题所做的更多数据和事情:

问题是:Airflow 对子进程(气流调度程序用于运行 Bash Operators)的 Python 使用与“普通”Bash 有何不同,导致error 1?

标签: airflowwindows-subsystem-for-linuxairflow-schedulerwsl-2

解决方案


你可以使用 Python 和 PowerShell 的库 subprocess 和 sys

在 Airflow > Dags 文件夹中,创建 2 个文件:main.py 和 caller.py

因此,main.py 调用 caller.py 和 caller.py 进入机器(Windows)运行文件或例程。

这是过程:

在此处输入图像描述

代码 Main.py:

# Importing the libraries we are going to use in this example
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator


# Defining some basic arguments
default_args = {
   'owner': 'your_name_here',
   'depends_on_past': False,
   'start_date': datetime(2019, 1, 1),
   'retries': 0,
   }


# Naming the DAG and defining when it will run (you can also use arguments in Crontab if you want the DAG to run for example every day at 8 am)
with DAG(
       'Main',
       schedule_interval=timedelta(minutes=1),
       catchup=False,
       default_args=default_args
       ) as dag:

# Defining the tasks that the DAG will perform, in this case the execution of two Python programs, calling their execution by bash commands
    t1 = BashOperator(
       task_id='caller',
       bash_command="""
       cd /home/[Your_Users_Name]/airflow/dags/
       python3 Caller.py
       """)

    # copy t1, paste, rename t1 to t2 and call file.py
    
# Defining the execution pattern
    t1

    # comment: t1 execute and call t2
    # t1 >> t2

代码调用者.py

import subprocess, sys

p = subprocess.Popen(["powershell.exe"
                     ,"cd C:\\Users\\[Your_Users_Name]\\Desktop; python file.py"] # file .py
                    #,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.html"]    # file .html
                    #,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.bat"]     # file .bat
                    #,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.exe"]     # file .exe
                    , stdout=sys.stdout
                     )

p.communicate()

如何知道您的代码是否可以在气流中工作,如果运行,就可以了。

在此处输入图像描述


推荐阅读