python - Airflow - 通过 API 调用 DAG 并在大多数方法中传递参数
问题描述
我有一个用例通过 AIRFLOW API 在 DAG 气流中调用 python 脚本。这个 python 脚本将通过 JSON 接收参数。我可以在 shell 上运行 python 并按预期返回结果。
我的 API POST 如下 -
curl -X POST -H "Cache-Control: no-cache" -H "Content-Type: application/json" http://localhost:8080/api/experimental/dags/DAG-3/dag_runs -d '{"conf":"{\"hostname\":\"<servername>\", \"username\":\"<username>\", \"password\":\"password\", \"command1\":\"hostname\" }"}'
我的 DAG 如下:
dag = DAG(
dag_id='DAG-3',
default_args=default_args,
dagrun_timeout=timedelta(minutes=10)
)
#cmd_command = "python3.6 /root/test21.py '{\"hostname\": \"json_data(hostname)\", \"username\":\"json_data(username)\", \"password\":\"json_data(password)\", \"command1\":\"json_data(command)\"}'"
cmd_command = "python3.6 /root/test21.py '{{ hostname }}' '{{ username }}' '{{ password }}' '{{ command1 }}'"
t = BashOperator(
task_id = 'execute_script',
bash_command = cmd_command,
dag = dag)
有了这个,我看到 API POST 的参数没有被选中,我这样做是否正确。当我对cmd_command
DAGS 中的包进行硬编码时,我不确定如何通过 POST 将其作为 API 传递
请帮忙。
解决方案
conf 需要被dag_run.conf
.
cmd_command = "python3.6 /root/test21.py '{{ dag_run.conf.hostname }}' '{{ dag_run.conf.username }}' '{{ dag_run.conf.password }}' '{{ dag_run.conf.command1 }}'"
t = BashOperator(
task_id = 'execute_script',
bash_command = cmd_command,
dag = dag)
推荐阅读
- python - 在python中查找两种时间格式之间的持续时间?
- java - BindingResult 不起作用
- android - Android textview 可水平滚动
- x86 - UEFI 引导加载程序中的 SPI 寄存器读写应用程序
- visual-studio - 使用 VS 2017 中的 cl.exe 编译 arm 或 arm64 程序时缺少 mspdbcore.dll
- swagger-ui - Swagger UI 的 requestInterceptor 中的 Promise
- react-native - 为什么使用 this.setState 会导致超过最大更新深度?
- javascript - Javascript 未显示所需的值
- mysql - 如何从别名中找到mysql中的总数(总和)?
- java - 使用BLE同步多个设备的时间