airflow - 带有 python_callable 集的 PythonOperator 不断执行
问题描述
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['jimin.park1@aig.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': airflow.utils.dates.days_ago(0)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)
t1 = PythonOperator(
task_id='Task1',
provide_context=True,
python_callable=some_task,
dag=dag
)
实际的 some_task 本身只是将时间戳附加到某个文件。正如您在 dag 配置文件中看到的,任务本身被配置为每 1 分钟运行一次。
def some_task(ds, **kwargs):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("test.txt", "a") as myfile:
myfile.write(current_time + '\n')
我只是在没有运行调度程序的情况下拖尾 -f 输出文件并启动了网络服务器。当网络服务器启动时,这个函数被调用并且东西被附加到文件中。当我启动调度程序时,在每个执行循环中,文件都会被附加。
我想要的是按预期每分钟执行该函数,而不是每个执行循环。
解决方案
调度程序将在每个调度程序循环中运行每个 DAG 文件,包括所有导入语句。
导入函数的文件中是否有任何正在运行的代码?
推荐阅读
- javascript - 如何在ionic中将视频文件url转换为base64?
- rest - 如何将根端点映射到 ASP.NET Core 控制器?
- cassandra - LeveledCompactionStrategy:调整 sstable_size_in_mb 有什么影响?
- java - akka 记录死信 - 为什么是 INFO?我想要它作为错误
- dc.js - dc.js/crossfilter 中的“静态”过滤器
- python - FROM 命令在 Dockerfile 中有什么作用?
- javascript - Vuejs从url缓存图片,避免重新获取
- php - Symfony 3 - 为 FormType 添加样式(Entity::class => choiceLabels)
- python - 识别时间序列数据中数据点低于设定值的窗口
- python-3.x - 如何使用 Python Flask 动态创建表单