首页 > 解决方案 > 循环运行 Airflow 任务直到完成

问题描述

我正在构建一个基本的 ETL 管道,该管道到达一个主端点,该端点包含一个用于处理的 ID 列表(每次调用的可变数量)。我目前的想法是使用 RabbitMQ 作为队列系统,并从 RabbitMQ 消耗三个任务(提取、转换、加载)。我在网上看到的大多数教程都展示了任务在退出之前的简单顺序执行。我尝试构建一个 DAG,对我们收到的每个 ID 执行此顺序操作。但是当我不知道存在多少个 ID 时,我在试图弄清楚如何通过气流安排所有这些任务时遇到了问题。

这是相关 DAG 的一般树视图:

ETL 树视图

我已经开始使用 RabbitMQ 将这些 ID 推送到队列中,并让 celery 处理可变数量的工作人员来处理负载。我遇到的问题是我不知道如何打破“消费”循环。例如(我使用伪代码对 RabbitMQ 进行一些抽象):

def extract():
    # callback function when messages are sent to this worker
    def _extract(channel, url, rest):
        resp = request.get(url)
        channel.publish('transform_queue', resp)

    # attach the callback to the queue
    channel.basic_consume('extract_queue', callback=_extract)
    channel.start_consuming() # runs a pseudo loop waiting for messages here

请注意,一些变量(例如下面的通道_extract是隐式的,但很可能会包装在自定义运算符中。

Load 和 Transform 函数的工作方式类似。我遇到的问题是当函数开始使用它时它不会停止,直到它关闭。我已经能够发送哨兵消息以允许该功能“退出”,但这将导致任务被标记为失败,并被发送重试。例如,这里是哨兵关闭的代码。

def extract():
    # callback function when messages are sent to this worker
    def _extract(channel, message, rest):
        if message == SHUTDOWN:
            exit()
        
        resp = requests.get(message.url)
        channel.publish('transform_queue', resp)

    # attach the callback to the queue
    channel.basic_consume('extract_queue', callback=_extract)
    channel.start_consuming() # runs a pseudo loop waiting for messages here

还有选择性地取消消费者的选项,但是这只会增加更多的复杂性,因为仍然存在轮询取消的问题,然后该任务最终会遇到上述相同的问题。

我的主要问题是:

有没有办法在这个设置中成功退出?

这是解决这个问题的最佳方法吗?我想这是气流的常见用例,因此必须有一些最佳实践或常见设置。但是,我一直无法找到它。

标签: pythonrabbitmqceleryairflowetl

解决方案


我可以从您的问题中理解以下内容,因此建议您探索以下内容。

您不确定输入的数量,因此不确定您想要运行流程的次数

您可以创建一个自定义运算符(例如 FindIDs),它首先找出您需要执行多少个 ID,然后将值推送到 XComs。然后这些消息可以用于您的其他功能(例如提取器、转换器和加载器),并且可以按如下顺序设置它们

start >> findIds >> extractor >> transformer >> loader >> end

检查https://airflow.apache.org/docs/apache-airflow/stable/concepts.html?#xcoms

如果没有输入(或您的情况下的 ID),您需要跳过某些执行

在这种情况下,我会使用 ShortCircuitOperator 并有条件地跳过 DAG 的执行。检查:https ://github.com/apache/airflow/blob/master/airflow/example_dags/example_short_circuit_operator.py


推荐阅读