python - 无法查明 GCP Composer (Airflow) DAG 任务失败的问题
问题描述
我是使用 Apache Airflow 的新手。我的 dag 的一些操作员状态为失败。我试图了解错误的根源。
以下是问题的详细信息: 我的 dag 相当大,它的某些部分是由子 dag 组成的。我在 Composer UI 中注意到的是,失败的 Subdagstask_id
都在download_file
使用.XCom
GoogleCloudStorageDownloadOperator
>> GoogleCloudStorageDownloadOperator(
task_id='download_file',
bucket="sftp_sef",
object="{{task_instance.xcom_pull(task_ids='find_file') | first }}",
filename="/home/airflow/gcs/data/zips/{{{{ds_nodash}}}}_{0}.zip".format(table)
)
上述 Subdag 中的日志没有显示任何有用的信息。
日志 :
[2020-04-07 15:19:25,618] {models.py:1359} INFO - 依赖项都满足 [2020-04-07 15:19:25,660] {models.py:1359} INFO - 依赖项都满足[2020-04-07 15:19:25,660] {models.py:1577} 信息 -
-------------------------------------------------- ----------------------------- 开始尝试 10 次,共 1 次
[2020-04-07 15:19:25,685] {models.py:1599} INFO - 执行于 2020-04-06T11:44:31+00:00 [2020-04-07 15:19:25,685] {base_task_runner .py:118} 信息 - 运行:['bash', '-c', 'airflow run datamart_integration.consentement_email download_file 2020-04-06T11:44:31+00:00 --job_id 156313 --pool integration --raw -sd DAGS_FOLDER/datamart/datamart_integration.py --cfg_path /tmp/tmpacazgnve']
我不确定是否有我没有检查的地方......这是我的问题:
- 我一般如何调试 Composer DAG 中的错误
- 创建一个本地气流环境来在本地运行和调试我的 dags 是个好主意吗?
- 如何验证 XCom 中是否有错误?
解决方案
关于你的三个问题:
首先,在使用 Cloud Composer 时,您可以通过多种方式在代码中调试错误。根据文档,您应该:
- 检查气流日志。
这些日志与单个 DAG 任务相关。可以在 Cloud Storage 的日志文件夹和 Web Airflow 界面中查看它们。
当您创建 Cloud Composer 环境时,还会创建一个 Cloud Storage Bucket 并与之关联。因此,Cloud Composer 将单个 DAG 任务的日志存储在此存储桶内的日志文件夹中,每个工作流文件夹都有一个用于存放其 DAG 和子 DAG 的文件夹。你可以在这里查看它的结构。
关于 Airflow 网页界面,每 60 秒刷新一次。此外,您可以在此处查看更多信息。
- 查看 Google Cloud 的操作套件。
您可以将Cloud Monitoring和Cloud Logging与 Cloud Composer 结合使用。Cloud Monitoring 可让您了解基于云的应用的性能和整体运行状况,而 Cloud Logging 则显示调度程序和工作容器生成的日志。因此,您可以根据需要使用两者或仅使用您认为更有用的一个。
在 Cloud Console 中,检查页面上运行您的环境的 GCP 组件的错误。
在 Airflow Web 界面中,在 DAG 的图表视图中检查失败的任务实例。
因此,这些是在对 DAG 进行故障排除时推荐的步骤。
其次,关于测试和调试,建议您将生产环境和测试环境分开,避免DAG干扰。
此外,可以在本地测试您的 DAG,文档中有关于此主题的教程,请点击此处。本地测试允许您识别语法和任务错误。但是,我必须指出,无法检查/评估依赖关系以及与数据库的通信。
第三,一般来说,为了验证 Xcom 中的错误,您应该检查:
- 如果有任何错误代码/编号;
- 如果您的语法正确,请检查文档中的示例代码;
- 检查软件包是否已弃用;
我想指出的是,根据此文档,GoogleCloudStorageDownloadOperator的路径 已更新为GCSToLocalOperator。
此外,我还鼓励您看一下:检查 Xcom 语法和错误的代码和文档。
如果您需要进一步的帮助,请随时与我分享错误代码。
推荐阅读
- c# - 在 Visual Studio 2017 上没有 Locals 窗口
- python - 尝试从文件读取时无法连接
- angular - Ionic 4 到 5,angular 版本 4 到 11 迁移,ionic app-scripts watch 无法启动
- ios - 如何在 Flutter 中为每个蓝牙连接的唯一设备(汽车车辆识别号)实现 Apple 许可证订阅
- java - Java无法打开文件?
- esp32 - ESP32 OTA 无需向设备刷写代码
- python - 通过添加更多列来压缩数据框
- android - 如何在android中关闭导航抽屉
- python - Numpy:计算大数组的协方差
- django - Django ORM:误导`prefetch_related`的`first()`