首页 > 解决方案 > 如何从气流中的变量写入/读取时间戳?

问题描述

我正在使用: EXEC_DATE = '{{ macros.ds_add(ds, 1) }}'这给了我执行日期,而不是小时。我希望能够将此值保存为YYYY-MM-DD HH:MM名为process_last_run.

基本上在运行开始时读取变量并在 dag 结束时写入它。此变量指示最后一个 dag 的运行时间。

我怎样才能做到这一点?

标签: airflow

解决方案


您可以使用宏来做到这一点execution_date。但是请注意,这是气流中一个名称不佳的概念。它表示预定间隔周期的开始。即使手动重新运行任务,它也不会在同一个 dag-run 中更改。它在那里支持幂等数据更新。坦率地说,这是处理数据管道的最佳方式。在您的情况下,尽管您在其他地方说过您的数据获取 api 需要一个开始日期并提供最新的所有数据,这不利于被幂等处理,尽管您可以在指定的截止日期后丢弃数据。

因此,您可能只是在数据处理完成后获取日期,并将其存储以备后用。您可以存储到气流变量中。您可能会注意到,尽管您退出下面显示的日期命令的时间将晚于您可能从您的 process_data api 调用中获得的数据的最后时间,以获取从开始日期开始的所有数据。因此,如果您的处理步骤将处理数据的实际最后日期和时间输出为标准输出的最后一行(由 BashOperator for xcom 捕获),则可能会更好。

例如

from airflow.models import Variable, DAG
from datetime import datetime

def pyop_fun(**context):
  # You could have used execution_date here and in the next operator
  # to make the operator rerun safe.
  # date_string = context['execution_date'].strftime('%Y-%m-%d %H:%M')
  # But elsewhere you said your api is always giving you the up-to-the-minute data.
  # So maybe getting the date from the prior task would work better for you.
  Variable.set(
    'process_last_run',
    context['task_instance'].xcom_pull(task_ids='process_data')

with dag as DAG(…):
  pyop = PythonOperator(
    task_id='set_process_last_run',
    callable=pyop_fun,
    provide_context=True, …)
  shop = BashOperator(
    task_id='process_data',
    bash_command='''
      process_data "{{var.value.process_last_run}}";
      date -u +%Y-%m-%d\ %H:%M''',
    xcom_push=True, …)
  shop >> pyop

# Because the last output line of a BashOperator is pushed into xcom for that
# task id with the default key, it can be pulled by the PythonOperator and 
# stored in a variable.

推荐阅读