python - Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行
问题描述
语境
我正在尝试使用 Composer、DataProc 和 BigQuery 在 Google Cloud Platform 上构建一个摄取管道。我在 BigQuery 中有一个表,其中包含数据源及其相关文件的记录。因此,如果我需要提取 5 个文件,那么 BigQuery 表中有 5 条记录。明天可能是不同数量的文件。因此,我考虑在我的 DAG 中动态构建任务。
高层设计如下:
- 执行一个函数以从 BigQuery 获取数据作为 Pandas 数据帧(或 dict,两者都可以)
- 遍历数据框
- 对于数据框中的每一行,创建一个 DataProcSparkOperator ,其中包含有关文件和相应参数的详细信息
此设置运行良好。我可以在 Airflow UI 中看到我的 DAG 和所有动态生成的任务。
编辑:只需添加更多细节。BigQuery 表将包含少于 25 条记录,因此无需担心查询表。每 30 秒查询一次表。其次,我只需要这个 DAG 每 4 小时左右运行一次。我不打算让我的作曲家在那段时间继续运行。我需要每 4 小时启动 Composer,运行一次DAG以处理所有可用文件,然后关闭。
问题
在执行这些 DataProc 任务时,大约几分钟后,Airflow 会刷新 DAG 并再次运行同一组任务。在 DataProc Jobs 控制台中,我看到同一任务的 2 个(有时是 3 个)实例处于运行状态。这是不可取的。
我试过的
我已经retries=0
在任务级别和 DAG 上设置了catchup=False
、max_active_runs=1
和schedule_interval='@once'
。DAG 的默认参数也有retries=0
.
我认为问题在于我从 BigQuery 中提取记录的部分是普通功能的一部分,而不是本身的任务。我没有把它放在一个任务中的原因是因为我找不到一个解决方案来将从 BigQuery 获取的结果传递到我必须循环它们的后续任务中。
我尝试调用 PythonOperator 并在其中执行Variable.set("df", df)
,希望可以循环,Variable.get("df")
但这也没有成功。
下面分享相关代码。
def fetch_pending_files_from_bq():
# fetch records from BigQuery and return as dataframe
default_args = {
'start_date': yesterday,
'default_timezone': 'utc',
'retries': 0
}
dag = DAG(
dagid,
default_args=default_args,
catchup=False,
max_active_runs=1,
description='DAG to ingest data',
schedule_interval='@once'
)
start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)
pending_files_df = fetch_pending_files_from_bq()
for index, row in pending_files_df.iterrows():
task = DataProcSparkOperator(
dag=dag,
task_id=row["file_name"],
arguments=dataproc_args,
region="us-east1",
job_name="job_{}".format(task_id),
dataproc_spark_jars=dataproc_jars,
....
....
)
task.set_upstream(start_dag)
task.set_downstream(end_dag)
我得到了我想要的编排,唯一的问题是我的 DataProc 作业会自动重新运行。
任何想法表示赞赏。
解决方案
在深入研究设计时,我意识到 fetch_pending_files_from_bq 不是一项任务,因此每次刷新 dag 时都会执行它。这导致了多个查询,并且还导致意外创建重复任务。因此我放弃了这个设计。
我能够使用 subdags 解决这个问题。第一个 subdag 从 BQ 读取并写入 GCS。第二个 subdag 从 GCS 读取文件并动态创建任务。
推荐阅读
- mongodb - 如何在猫鼬中找到特定值设置为true的最后一项
- c# - 在浏览器中查看 PDF 文档时出错
- java - 错误:客户端不支持服务器请求的认证协议;考虑升级 MySQL 客户端
- python - SpeechRecognition Python 包不听
- java - PDF 查看器放大和缩小
- javascript - 图像根据父 div 大小调整大小或裁剪
- php - Laravel 5.6 将变量传递给中间件查看
- angular - 滚动到从不同组件角度触发的特定部分 4
- owl - 在 OWL API 中的 OWL2DL 违规报告中检索 URI
- php - 如何正确拥有具有多个过滤器的搜索页面?