首页 > 解决方案 > 在 Airflow 2.0 中使用 Taskflow API 传递争论

问题描述

我正在使用 REST API 将参数传递给基于任务流的 Dag。看看这个论坛上提出的类似问题,似乎下面是访问传递参数的常用方法。

#From inside a template field or file:
{{ dag_run.conf['key'] }}
#Or when context is available, e.g. within a python callable of the PythonOperator:
context['dag_run'].conf['key'] 

我试图获取上下文字典

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), params=None)
def classic_xgb(**context):
    """
    ### TaskFlow API Tutorial Documentation
    [here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        print("context is ", context)

输出是 <airflow.models.dagparam.DagParam object at 0x7f735c634510> 现在我如何获得作为输入参数传递给 Dag 的 conf 字典?我需要在我的 python 代码中使用争论,所以模板选项对我来说似乎不可行。

非常感激任何的帮助。

谢谢

此致,

阿德尔

标签: airflow

解决方案


Airflow 2.0 中有一个新功能get_current_context()可以获取上下文。获得上下文字典后,“params”键包含通过 REST API 发送到 Dag 的参数。下面的代码解决了这个问题。

from airflow.operators.python import task, get_current_context

default_args = {
    'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def classic_xgb(**kwargs):

    """
    @task()
    def extract():
        context = get_current_context()

推荐阅读