首页 > 解决方案 > 通过从数据库中获取增量数据来触发 Airflow ETL 作业

问题描述

我想对 MongoDB 中的数据执行一些转换,并通过 Airflow DAG 将转换后的数据插入 ElasticSearch。虽然我可以编写运算符来获取数据并执行转换,但挑战是我只想在新数据插入集合并获取新数据进行转换时触发 DAG。我已经阅读了有关传感器的信息,但我还没有找到具体的解决方案。有人可以建议吗?

编辑:对于没有正确解释问题,我深表歉意。

以下是我必须做的任务: 1. 每当将新文档插入 MongoDB 时,获取文档并进行一些 JSON 转换。2. 将转换后的 JSON 数据流式传输到下一个完成进一步处理的任务,并将数据插入 Elasticsearch 索引。

我只获取新数据的想法是: 1. 将唯一的增量 batch_id 与 MongoDB 中的数据一起插入。2. 获取前一个作业的最大 batch_id 并戳 MongoDB 以获得递增的 batch_id。3. 如果新增加的batch_id 存在,则获取数据。

上述想法有效,但我不确定这是否是正确的方法。

目前的管道如下:

第一项任务是传感器

标签: mongodbairflow

解决方案


推荐阅读