首页 > 解决方案 > 带有 python_callable 集的 PythonOperator 不断执行

问题描述

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['jimin.park1@aig.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
    'start_date': airflow.utils.dates.days_ago(0)
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)

t1 = PythonOperator(
    task_id='Task1',
    provide_context=True,
    python_callable=some_task,
    dag=dag
)

实际的 some_task 本身只是将时间戳附加到某个文件。正如您在 dag 配置文件中看到的,任务本身被配置为每 1 分钟运行一次。

def some_task(ds, **kwargs):
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open("test.txt", "a") as myfile:
        myfile.write(current_time + '\n')

我只是在没有运行调度程序的情况下拖尾 -f 输出文件并启动了网络服务器。当网络服务器启动时,这个函数被调用并且东西被附加到文件中。当我启动调度程序时,在每个执行循环中,文件都会被附加。

我想要的是按预期每分钟执行该函数,而不是每个执行循环。

标签: airflowairflow-scheduler

解决方案


调度程序将在每个调度程序循环中运行每个 DAG 文件,包括所有导入语句。

导入函数的文件中是否有任何正在运行的代码?


推荐阅读