python - 如何根据前一个任务的结果在 SubDAG 中真正创建 n 个任务
问题描述
我正在使用 SubDAG 在 Airflow 中创建一个动态 DAG。我需要的是 SubDAG 中的任务数量由前一个任务的结果确定(函数的subtask_ids
变量middle_section
应该是函数的相同变量initial_task
)。
问题是我无法访问xcom
a 的 subdag 函数,SubDagOperator
因为我没有任何上下文。此外,由于调度程序的自动发现 DAG 功能,我无法访问任何数据库来读取某些值:middle_section
每隔几秒执行一次。
你们怎么解决这个问题?根据先前任务的结果在 SubDAG 中创建动态数量的任务?
这是我正在开发的代码:
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
def initial_task(**context):
subtask_ids = [0, 1, 2]
task_instance = context['ti']
task_instance.xcom_push(key='depot_ids', value=subtask_ids)
def middle_section_task(subtask_id):
print(subtask_id)
def middle_section(parent_dag, arg):
subdag = DAG(dag_id=f'{dag.dag_id}.middle',
default_args=args, schedule_interval='@once')
subtask_ids = '' # Read from xcom
for subtask_id in subtask_ids:
PythonOperator(task_id=f'{dag.dag_id}.middle_section_task_{subtask_id}',
python_callable=middle_section_task,
op_kwargs={'subtask_id': subtask_id}, dag=subdag)
return subdag
def end_task(**context):
print('Finished')
dag = DAG(dag_id='stackoverflow', default_args=args, schedule_interval=None)
initial = PythonOperator(task_id='start_task', python_callable=initial_task,
provide_context=True, dag=dag)
middle = SubDagOperator(task_id='middle', subdag=middle_section(dag, args),
default_args=args, dag=dag)
end = PythonOperator(task_id='end_task', python_callable=end_task,
provide_context=True, dag=dag)
initial >> middle >> end
解决方案
我有同样的问题,我无法以“气流方式”正确解决 100% 的问题,因为我认为气流任务和子任务的数量是在 DAG 验证时定义的。并且在验证时没有运行任何任务,因此气流无法事先知道将安排多少 subdag.tasks。
我绕过这个问题的方式可能不是最好的(我愿意接受建议),但它确实有效:
main_dag.py
# imports omitted for brevity
def get_info_from_db():
# get info from db or somewhere else, this info will define the number of subdag tasks to run
return urls, names
dag = DAG(...)
urls, names = get_info_from_db()
# You may ignore the dummy operators
start = DummyOperator(task_id='start', default_args=args, dag=dag)
sub_section = SubDagOperator(
task_id='import-file',
subdag=imported_subdag(DAG_NAME, 'subdag-name', args, urls=urls, file_names=names),
default_args=args,
dag=dag,
)
end = DummyOperator(task_id='end', default_args=args, dag=dag)
start.set_downstream(sub_section)
section_1.set_downstream(end)
然后最后我有我的 subdag.py (确保它可以从气流中发现)以防它位于单独的文件中
# imports omitted for brevity
def fetch_files(file_url, file_name):
# get file and save it to disk
return file_location
# this is how I get info returned from the previous task: fetch_files
def validate_file(task_id, **kwargs):
ti = kwargs['ti']
task = 'fetch_file-{}'.format(task_id)
file_location = ti.xcom_pull(task_ids=task)
def imported_subdag(parent_dag_name, child_dag_name, args, urls, file_names):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
for i in range(len(urls)):
# the task name should also be dynamic in order not to have duplicates
validate_file_operator = PythonOperator(task_id='validate_file-{}'.format(i+1),
python_callable=validate_file,
provide_context=True, dag=dag_subdag, op_kwargs={'task_id': i + 1})
fetch_operator = PythonOperator(task_id='fetch_file-{}'.format(i+1),
python_callable=fetch_zip, dag=dag_subdag,
op_kwargs={'file_url': urls[i], 'file_name': file_names[i]})
fetch_operator.set_downstream(validate_file_operator)
return dag_subdag
基本上我的逻辑是,在 Airflow 验证的那一刻,get_info_from_db()
所有 dag 和 subdag 都被正确地动态调度。如果我在数据库中添加或删除内容,要运行的任务数将在下一次 dag 验证中更新。
这种方法适合我的用例,但我希望将来 Airflow 能够原生支持此功能(动态数量的任务/subdag.tasks)。
推荐阅读
- python - 是否可以使用 Spark 从 Kubernetes 之外的外部 HBase 集群中读取数据进行处理?
- javascript - 向左移位在这里完成了什么?
- python - 使用已编译 python 中的值
- reactjs - 在不使用 react-apollo 的情况下应用 appsync 订阅的优化方法是什么?
- linux - SSH 后 shell 脚本停止(登录到 kubernetes pod)
- android - 对“jniRegisterNativeMethods”Android NDK 的未定义引用
- ruby-on-rails - NoMethodError(nil:NilClass 的未定义方法 `posts'):
- mysql - 姜戈。我如何获得重复的用户列表?
- javascript - 如何调用一组对象中同一列的属性?(Node.js)
- ios - 协议实现方法不在 Swift 中调用