airflow - 气流调度程序无法通过 WSL 执行 Windows EXE
问题描述
我的 Windows 10 机器在 WSL 2 (Ubuntu-20.04) 中安装了 Airflow 1.10.11。
我有一个 BashOperator 任务,它在 Windows 上调用 .EXE(通过 /mnt/c/... 或通过符号链接)。任务失败。日志显示:
[2020-12-16 18:34:11,833] {bash_operator.py:134} INFO - Temporary script location: /tmp/airflowtmp2gz6d79p/download.legacyFilesnihvszli
[2020-12-16 18:34:11,833] {bash_operator.py:146} INFO - Running command: /mnt/c/Windows/py.exe
[2020-12-16 18:34:11,836] {bash_operator.py:153} INFO - Output:
[2020-12-16 18:34:11,840] {bash_operator.py:159} INFO - Command exited with return code 1
[2020-12-16 18:34:11,843] {taskinstance.py:1150} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.8/dist-packages/airflow/operators/bash_operator.py", line 165, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2020-12-16 18:34:11,844] {taskinstance.py:1187} INFO - Marking task as FAILED. dag_id=test-dag, task_id=download.files, execution_date=20201216T043701, start_date=20201216T073411, end_date=20201216T073411
就是这样。返回1
没有更多有用信息的代码。
通过 bash 运行相同的 EXE 可以完美运行,没有错误(我也在我自己的程序上尝试过它,它向控制台发出一些东西 - 在 bash 中它发出的东西很好,但通过airflow scheduler
它是相同的错误 1)。
我为排除任何其他问题所做的更多数据和事情:
airflow scheduler
以 root 身份运行。我还通过在我的 BashOperator 中输入root
一个命令来确认它在上下文中运行,该命令确实发出了(我还应该注意,所有本机 Linux 程序都运行得很好!只有 Windows 程序没有。)whoami
root
- 我正在尝试执行的 Windows EXE 及其目录具有完整的“所有人”权限(当然,在我自己的程序上,我不敢在我的 Windows 文件夹上执行它——这只是一个例子。)
- 通过 /mnt/c 以及通过符号链接访问时都会发生故障。在符号链接的情况下,符号链接具有 777 权限。
- 我尝试
airflow test
在 BashOperator 任务上运行 - 它运行完美 - 向控制台发出输出并返回 0(成功)。 - 尝试了各种 EXE 文件 - “本机”(例如 Windows 附带的)以及我的 C# 制作的程序。完全一样的行为。
- 在 Airflow 的 GitHub 存储库和 Stack Overflow 中都没有发现任何类似的问题。
问题是:Airflow 对子进程(气流调度程序用于运行 Bash Operators)的 Python 使用与“普通”Bash 有何不同,导致error 1
?
解决方案
你可以使用 Python 和 PowerShell 的库 subprocess 和 sys
在 Airflow > Dags 文件夹中,创建 2 个文件:main.py 和 caller.py
因此,main.py 调用 caller.py 和 caller.py 进入机器(Windows)运行文件或例程。
这是过程:
代码 Main.py:
# Importing the libraries we are going to use in this example
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
# Defining some basic arguments
default_args = {
'owner': 'your_name_here',
'depends_on_past': False,
'start_date': datetime(2019, 1, 1),
'retries': 0,
}
# Naming the DAG and defining when it will run (you can also use arguments in Crontab if you want the DAG to run for example every day at 8 am)
with DAG(
'Main',
schedule_interval=timedelta(minutes=1),
catchup=False,
default_args=default_args
) as dag:
# Defining the tasks that the DAG will perform, in this case the execution of two Python programs, calling their execution by bash commands
t1 = BashOperator(
task_id='caller',
bash_command="""
cd /home/[Your_Users_Name]/airflow/dags/
python3 Caller.py
""")
# copy t1, paste, rename t1 to t2 and call file.py
# Defining the execution pattern
t1
# comment: t1 execute and call t2
# t1 >> t2
代码调用者.py
import subprocess, sys
p = subprocess.Popen(["powershell.exe"
,"cd C:\\Users\\[Your_Users_Name]\\Desktop; python file.py"] # file .py
#,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.html"] # file .html
#,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.bat"] # file .bat
#,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.exe"] # file .exe
, stdout=sys.stdout
)
p.communicate()
如何知道您的代码是否可以在气流中工作,如果运行,就可以了。
推荐阅读
- c++ - 带有指针字符串的 C++ 二维动态数组
- python - 使用插值清除列内的异常值
- r - 我的所有变量的显着性水平为 1
- azure-devops - Azure DevOps 禁用“删除分支”选项的权限
- java - 用于嵌套对象转换的 Jolt JSON 规范
- sql - AWS Athena 为什么单行或其列的大小不能超过 32 MB 错误 select * from tableName 但不是在 where 条件
- c# - System.Text.Json 不能用对象数组反序列化一个对象?
- javascript - google.script.host.close 不是函数,如何解决?
- python - 通过 AWS Cloudwatch Syntetic Canary 函数获取具有身份验证的 URL
- python - Python:如何在文本框中从单词中读取表格?