mongodb - 通过从数据库中获取增量数据来触发 Airflow ETL 作业
问题描述
我想对 MongoDB 中的数据执行一些转换,并通过 Airflow DAG 将转换后的数据插入 ElasticSearch。虽然我可以编写运算符来获取数据并执行转换,但挑战是我只想在新数据插入集合并获取新数据进行转换时触发 DAG。我已经阅读了有关传感器的信息,但我还没有找到具体的解决方案。有人可以建议吗?
编辑:对于没有正确解释问题,我深表歉意。
以下是我必须做的任务: 1. 每当将新文档插入 MongoDB 时,获取文档并进行一些 JSON 转换。2. 将转换后的 JSON 数据流式传输到下一个完成进一步处理的任务,并将数据插入 Elasticsearch 索引。
我只获取新数据的想法是: 1. 将唯一的增量 batch_id 与 MongoDB 中的数据一起插入。2. 获取前一个作业的最大 batch_id 并戳 MongoDB 以获得递增的 batch_id。3. 如果新增加的batch_id 存在,则获取数据。
上述想法有效,但我不确定这是否是正确的方法。
目前的管道如下:
解决方案
推荐阅读
- jquery - 无法在添加事件全日历插件上设置属性“显示”
- python - 如何使用 openpyxl 加入来自不同工作表的列?
- reactjs - 如何编辑待办事项列表?
- python - 安全地将附加属性修补到 Django HttpRequest
- javascript - 为什么单击 iframe 上的 x 时无法暂停视频
- javascript - 删除 b Javascript 中存在的 a 中的元素
- notepad++ - 如何在 Notepad++ 中的 HTML 属性名称后自动插入等号 (=) 和双引号 ("")?
- python - 将 1H 蜡烛数据的 pandas 数据帧重采样为 4H
- database - Odoo - 如何使用 OpenUpgrade 将数据库从 Odoo 12 迁移到 14
- javascript - 将特定键的数据从 javascript 传递给 vue