首页 > 解决方案 > Airflow - 通过 API 调用 DAG 并在大多数方法中传递参数

问题描述

我有一个用例通过 AIRFLOW API 在 DAG 气流中调用 python 脚本。这个 python 脚本将通过 JSON 接收参数。我可以在 shell 上运行 python 并按预期返回结果。

我的 API POST 如下 -

curl -X POST -H "Cache-Control: no-cache" -H "Content-Type: application/json" http://localhost:8080/api/experimental/dags/DAG-3/dag_runs -d '{"conf":"{\"hostname\":\"<servername>\", \"username\":\"<username>\", \"password\":\"password\", \"command1\":\"hostname\" }"}'

我的 DAG 如下:

dag = DAG(
    dag_id='DAG-3',
    default_args=default_args,
    dagrun_timeout=timedelta(minutes=10)
    )


#cmd_command = "python3.6 /root/test21.py '{\"hostname\": \"json_data(hostname)\", \"username\":\"json_data(username)\",  \"password\":\"json_data(password)\", \"command1\":\"json_data(command)\"}'"
cmd_command = "python3.6 /root/test21.py '{{ hostname }}' '{{ username }}' '{{ password }}' '{{ command1 }}'"


t = BashOperator(
     task_id = 'execute_script',
     bash_command = cmd_command,
     dag = dag)

有了这个,我看到 API POST 的参数没有被选中,我这样做是否正确。当我对cmd_commandDAGS 中的包进行硬编码时,我不确定如何通过 POST 将其作为 API 传递

请帮忙。

标签: pythonairflow

解决方案


conf 需要被dag_run.conf.

cmd_command = "python3.6 /root/test21.py '{{ dag_run.conf.hostname }}' '{{ dag_run.conf.username }}' '{{ dag_run.conf.password }}' '{{ dag_run.conf.command1 }}'"


t = BashOperator(
     task_id = 'execute_script',
     bash_command = cmd_command,
     dag = dag)

推荐阅读