首页 > 解决方案 > 无法使用气流插件运行气流任务

问题描述

我有 gcloud composer 气流实例和自定义气流网络插件,它们在特定端点上运行单独的作业。Airflow 版本是 1.14(1.14+云端的composer)

@rest_api_decorator('/run_task', "POST")
def trigger_tasks(self):
        dag_id = Util.get_argument(request, 'dag_id')
        task_id = Util.get_argument(request, 'task_id')
        subdir = Util.get_argument(request, 'subdir')
        execution_date = parsedate(Util.get_argument(request, 'execution_date'))

        dag = self.get_dag(subdir, dag_id)

        task = dag.get_task(task_id=task_id)
        ti = TaskInstance(task, execution_date)
        ti.refresh_from_db()
        ti_list.append(ti)

        executor = get_default_executor()
        executor.start()
        executor.queue_task_instance(
                ti,
                ignore_all_deps=True)
        executor.heartbeat()
        executor.end()

        result = {
            'dag_id': dag_id,
            'task_id': task_id_list
        }

        return Util.get_response(result)

上面的清单是 Web 插件端点的示例。此代码使用 celery 执行器在我的本地实例上流畅运行,但由于某种原因它在云中不起作用。从日志文件中我可以清楚地看到任务正在添加到队列中:

2021-04-28T16:17:04.962251133Zairflow-webserver [2021-04-28 16:17:01,054] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'tutorial', 'print_date', '2021-04-23T09:18:11+00:00', '-A', '--local', '--pool', 'default_pool', '-sd', 'DAGS_FOLDER/

但随后出现多个错误,表明上述操作未成功完成:

[2021-04-28 16:23:27,934] {redis.py:363} ERROR - Connection to Redis lost: Retry (0/20) now.@-@{"workflow": "tutorial", "task-id": "print_date", "execution-date": "2021-04-23T09:18:13+00:00"}

我能够使用 gcloud cli 运行单独的任务,但由于某种原因,类似的代码不会在插件中执行。我错过了什么吗?

更新

这是工作负载 [1] 的屏幕截图:https ://i.stack.imgur.com/GHUv6.png

标签: airflowgoogle-cloud-composergoogle-cloud-scheduler

解决方案


推荐阅读