airflow - 如果先前的任务执行需要更多时间,则会跳过气流计划
问题描述
我的气流 DAG 中有两个任务。一个触发 API 调用(Http 运算符),另一个使用另一个 api(Http 传感器)不断检查其状态。此 DAG 计划每小时和 10 分钟运行一次。但有时一次执行可能需要很长时间才能完成,例如 20 小时。在这种情况下,前一个任务正在运行时的所有计划都不会执行。
例如,假设我 01:10 的工作需要 10 个小时才能完成。计划 02:10, 03:10, 04:10, ... 11:10 等应该运行的计划被跳过,只有 12:10 的计划被执行。
我正在使用本地执行程序。我正在使用以下脚本运行气流服务器和调度程序。
start_server.sh
export AIRFLOW_HOME=./airflow_home;
export AIRFLOW_GPL_UNIDECODE=yes;
export AIRFLOW_CONN_REST_API=http://localhost:5000;
export AIRFLOW_CONN_MANAGEMENT_API=http://localhost:8001;
airflow initdb;
airflow webserver -p 7200;
start_scheduler.sh
export AIRFLOW_HOME=./airflow_home;
# Connection string for connecting to REST interface server
export AIRFLOW_CONN_REST_API=http://localhost:5000;
export AIRFLOW_CONN_MANAGEMENT_API=http://localhost:8001;
#export AIRFLOW__SMTP__SMTP_PASSWORD=**********;
airflow scheduler;
my_dag_file.py
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': admin_email_ids,
'email_on_failure': False,
'email_on_retry': False
}
DAG_ID = 'reconciliation_job_pipeline'
MANAGEMENT_RES_API_CONNECTION_CONFIG = 'management_api'
DA_REST_API_CONNECTION_CONFIG = 'rest_api'
recon_schedule = Variable.get('recon_cron_expression',"10 * * * *")
dag = DAG(DAG_ID, max_active_runs=1, default_args=default_args,
schedule_interval=recon_schedule,
catchup=False)
dag.doc_md = __doc__
spark_job_end_point = conf['sip_da']['spark_job_end_point']
fetch_index_record_count_config_key = conf['reconciliation'][
'fetch_index_record_count']
fetch_index_record_count = SparkJobOperator(
job_id_key='fetch_index_record_count_job',
config_key=fetch_index_record_count_config_key,
exec_id_req=False,
dag=dag,
http_conn_id=DA_REST_API_CONNECTION_CONFIG,
task_id='fetch_index_record_count_job',
data={},
method='POST',
endpoint=spark_job_end_point,
headers={
"Content-Type": "application/json"}
)
job_endpoint = conf['sip_da']['job_resource_endpoint']
fetch_index_record_count_status_job = JobStatusSensor(
job_id_key='fetch_index_record_count_job',
http_conn_id=DA_REST_API_CONNECTION_CONFIG,
task_id='fetch_index_record_count_status_job',
endpoint=job_endpoint,
method='GET',
request_params={'required': 'status'},
headers={"Content-Type": "application/json"},
dag=dag,
poke_interval=15
)
fetch_index_record_count>>fetch_index_record_count_status_job
SparkJobOperator
&JobStatusSensor
我的自定义类扩展SimpleHttpOperator
& HttpSensor
。
如果我设置depends_on_past
true
它会按预期工作吗?我对此选项的另一个问题是状态检查作业有时会失败。但是下一个时间表应该会触发。我怎样才能实现这种行为?
解决方案
我认为这里的主要讨论点是你设置的是什么catchup=False
,更多细节可以在这里找到。所以气流调度程序将跳过那些任务执行,你会看到你提到的行为。
如果前一个过程花费的时间比预期的要长,这听起来像是您需要执行追赶。你可以尝试改变它catchup=True
推荐阅读
- powershell - 如何使用PowerShell将文件从本地工作空间复制到远程服务器(不是网络共享路径)
- web - 需要从多个来源备份
- html - 如何在 CSS 中创建一个充满可变大小文本的完美圆形
- sql - 好奇是否有任何方法可以根据除工会之外的一天的每周分类来计算总数
- cytoscape.js - 边缘数据格式
- php - 我在 Android WebView URL 中遇到错误
- vue.js - 如何在开玩笑测试中验证 window.location.href
- api - 隐藏从提琴手或其他调试代理应用程序获取请求的请求/响应标头
- c++ - 将随机数放入缓冲区以写入文件的有效方法是什么?
- ruby-on-rails - 如何防止我的 Rails 控制器方法返回 nil 到我的视图?