首页 > 解决方案 > 如何在计划间隔为无的情况下为未来的气流安排 dag 触发器?

问题描述

我有一个 dag,我想根据外部来源的某些输入来安排未来的日期。

    {
        "run_id":"run-eventId109bnfghjasdmajjsd1basdasdaaasdsdasdsk2",
        "conf":{
            "softwareId":"something"
        },
        "execution_date": "11-03T09:10:30" //could be any future date
    }

使用这个第一个任务总是停留在排队状态,即使当前时间已经达到 json 中提到的预定时间,也永远保持在该状态。是否有可能使用气流实现我想要做的事情。我只想在未来的某个日期提交任务,并在那个特定的日期时间开始实际执行 dag。

import random
import string

import requests
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.sensors.python_sensor import PythonSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

dag = DAG(
    dag_id='xcomsDag',
    description='A simple tutorial DAG',
    schedule_interval=None,
    start_date=datetime(2020, 10, 30, 0, 0)
)

start = DummyOperator(task_id='run_this_first', dag=dag)
# arguments=["print('{}')".format("{{ task_instance.xcom_pull(task_ids='wait_for_approval', key='approverId') }}")],
passing = KubernetesPodOperator(namespace='default',
                                image="python:3.6",
                                cmds=["python", "-c"],
                                arguments=[
                                    'import time; import logging; logging.warn("Hello World"); logging.warn("xcomm value:{}"); time.sleep(10); logging.warn("Printed after 100 seconds.")'.format(
                                        "{{ task_instance.xcom_pull(task_ids='wait_for_approval', key='approverId') }}")],
                                labels={"foo": "bar"},
                                name="passing-task",
                                task_id="passing-task",
                                get_logs=True,
                                dag=dag,
                                service_account_name="airflow-release"
                                )


def wait_for_approval(dag_run, **context):
    software_id = dag_run.conf.get("softwareId", "Some Software Id")
    print("polling status for softwareId:{}".format(software_id))
    response = requests.get('https://5f8582eec29abd0016190be2.mockapi.io/api/v1/status')
    print(response.json())
    context["task_instance"].xcom_push(key="approverId", value="visardan-" + random.choice(string.ascii_letters))
    return response.json()[0]['status']


wait_for_approval = PythonSensor(
    task_id="wait_for_approval",
    python_callable=wait_for_approval,
    dag=dag,
    poke_interval=30,
    provide_context=True,
    executor_config={"KubernetesExecutor": {"image": "apache/airflow:1.10.12-python3.6"}}
)

start >> wait_for_approval >> passing

在此处输入图像描述

标签: airflowairflow-scheduler

解决方案


您可以创建一个开始日期等于请求的运行时间并使用@once计划间隔的 DAG。

例如(计划在 2030 年的未来日期)

dag = DAG(
    dag_id='xcomsDag',
    description='A simple tutorial DAG',
    schedule_interval='@once',
    start_date=datetime(2030, 1, 1, 0, 0)
)

请注意,如果您需要安排多个未来时间,则不能使用此解决方案,除非您为每次运行创建一个 DAG。


推荐阅读