python-3.x - 使用不同的日期参数重复气流 DAG 以进行数据迁移
问题描述
对于数据迁移,我创建了一个 DAG,它最终在所有具有所需逻辑的任务之后将数据插入到迁移表中。
DAG 有一个类似于下面的 sql,它最初提取数据并提供给其他任务:
sql=" select col_names from tables where created_on >=date1 and created_on <=date2"
对于每个 DAG 运行,我在上面的 sql 中手动更改 date1 和 date2 并启动数据迁移(由于数据块很重,截至目前日期范围长度为 1 周)。
我只想自动化这个日期更改过程,例如,如果我给出日期间隔,在第一个 DAG 运行后,第二次运行开始,依此类推,直到结束日期间隔。
到目前为止我已经研究过,我得到的一个解决方案是气流中的动态 DAGS。但问题是它创建了多个 DAG 文件实例,而且它也很难调试和维护。
有没有办法通过更改日期参数重复 DAG,以便我不再需要手动更改日期。
解决方案
我有类似的请求,这里是我如何访问稍后可在 SQL 中用于回填的日期。
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from datetime import datetime, timedelta
# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 8, 1),
'end_date': datetime(2020, 8, 3),
'retries': 0,
}
dag = DAG('helloWorld_v1', default_args=default_args, catchup=True, schedule_interval='0 1 * * *')
def print_dag_run_date(**kwargs):
print(kwargs)
execution_date = kwargs['ds']
prev_execution_date = kwargs['prev_ds']
return (execution_date, prev_execution_date)
# t1, t2 are examples of tasks created using operators
bash = BashOperator(
task_id='bash',
depends_on_past=True,
bash_command='echo "Hello World from Task 1"',
dag=dag)
py = PythonOperator(
task_id='py',
depends_on_past=True,
python_callable=print_dag_run_date,
provide_context=True,
dag=dag)
py.set_upstream(bash)
推荐阅读
- jmeter - 如何在 JMeter 中将请求分散到特定时间
- javascript - 如何从多个类别中获取所有产品(Woocommerce/Wordpress/React)
- python-3.x - AWS Lambda 容器映像中的 OpenCV
- c++ - 为什么字符串'text2'的输出为空白?
- java - Java swing 渲染问题卡片组黑色图像
- ios - Flutter IOS 从 xcode 生成应用程序发布
- python - 当我有连接的节点时,无法检索未连接的节点对
- mysql - 如何获取每天结果集的百分比?
- excel - 有没有办法将字符串传递给 Average 函数?
- visual-studio-code - VSCode nuxtjs css "wi" emmet 缩写自动补全 "widows"