airflow - 如何在计划间隔为无的情况下为未来的气流安排 dag 触发器?
问题描述
我有一个 dag,我想根据外部来源的某些输入来安排未来的日期。
{
"run_id":"run-eventId109bnfghjasdmajjsd1basdasdaaasdsdasdsk2",
"conf":{
"softwareId":"something"
},
"execution_date": "11-03T09:10:30" //could be any future date
}
使用这个第一个任务总是停留在排队状态,即使当前时间已经达到 json 中提到的预定时间,也永远保持在该状态。是否有可能使用气流实现我想要做的事情。我只想在未来的某个日期提交任务,并在那个特定的日期时间开始实际执行 dag。
import random
import string
import requests
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.sensors.python_sensor import PythonSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
dag = DAG(
dag_id='xcomsDag',
description='A simple tutorial DAG',
schedule_interval=None,
start_date=datetime(2020, 10, 30, 0, 0)
)
start = DummyOperator(task_id='run_this_first', dag=dag)
# arguments=["print('{}')".format("{{ task_instance.xcom_pull(task_ids='wait_for_approval', key='approverId') }}")],
passing = KubernetesPodOperator(namespace='default',
image="python:3.6",
cmds=["python", "-c"],
arguments=[
'import time; import logging; logging.warn("Hello World"); logging.warn("xcomm value:{}"); time.sleep(10); logging.warn("Printed after 100 seconds.")'.format(
"{{ task_instance.xcom_pull(task_ids='wait_for_approval', key='approverId') }}")],
labels={"foo": "bar"},
name="passing-task",
task_id="passing-task",
get_logs=True,
dag=dag,
service_account_name="airflow-release"
)
def wait_for_approval(dag_run, **context):
software_id = dag_run.conf.get("softwareId", "Some Software Id")
print("polling status for softwareId:{}".format(software_id))
response = requests.get('https://5f8582eec29abd0016190be2.mockapi.io/api/v1/status')
print(response.json())
context["task_instance"].xcom_push(key="approverId", value="visardan-" + random.choice(string.ascii_letters))
return response.json()[0]['status']
wait_for_approval = PythonSensor(
task_id="wait_for_approval",
python_callable=wait_for_approval,
dag=dag,
poke_interval=30,
provide_context=True,
executor_config={"KubernetesExecutor": {"image": "apache/airflow:1.10.12-python3.6"}}
)
start >> wait_for_approval >> passing
解决方案
您可以创建一个开始日期等于请求的运行时间并使用@once
计划间隔的 DAG。
例如(计划在 2030 年的未来日期):
dag = DAG(
dag_id='xcomsDag',
description='A simple tutorial DAG',
schedule_interval='@once',
start_date=datetime(2030, 1, 1, 0, 0)
)
请注意,如果您需要安排多个未来时间,则不能使用此解决方案,除非您为每次运行创建一个 DAG。
推荐阅读
- google-sheets - 如何组合 2 个查询导致来自 Google 表格的单个查询
- javascript - 如何在 asp.core MVC React 应用程序中注册 JS 引擎?
- python - django.db.utils.IntegrityError:安装夹具时出现问题
- java - 包含自身作为元素的 ArrayList 的哈希码
- python - 如何根据特定键比较python字典
- c++ - 执行用 C++ 编写的 .exe 的问题(使用 mingw 编译器)
- python - 将 DataFrame 中的每个唯一值重塑为列
- php - $row 将在此返回什么
- regex - 我如何将变量多行 perl 正则表达式与不同的规则匹配
- c# - 如何有条件地从 ASP.NET Core 注册中删除控制器并添加到 ServiceCollection