首页 > 解决方案 > 基于数据库连接上可用的行动态创建 DAG

问题描述

我想从数据库表查询中创建一个动态创建的 DAG。当我尝试从精确数字范围或基于气流设置中的可用对象创建动态创建 DAG 时,它成功了。但是,当我尝试使用 PostgresHook 并为表的每一行创建一个 DAG 时,我可以看到每当我在表中添加新行时都会生成一个新的 DAG。但是事实证明,我无法在气流网络服务器 ui 上单击新创建的 DAG。有关更多上下文,我正在使用 Google Cloud Composer。我已经按照DAG 中提到的步骤在 Google Cloud Composer 网络服务器上不可点击,但在本地 Airflow 上工作正常。但是它仍然不适用于我的情况。

这是我的代码

from datetime import datetime, timedelta

from airflow import DAG
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import NamedTupleCursor
import os

default_args = {
  "owner": "debug",
  "depends_on_past": False,
  "start_date": datetime(2018, 10, 17),
  "email": ["airflow@airflow.com"],
  "email_on_failure": False,
  "email_on_retry": False,
  "retries": 1,
  "retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}


def create_dag(dag_id,
           schedule,
           default_args):
def hello_world_py(*args):
    print 'Hello from DAG: {}'.format(dag_id)

dag = DAG(dag_id,
          schedule_interval=timedelta(days=1),
          default_args=default_args)

with dag:
    t1 = PythonOperator(
        task_id=dag_id,
        python_callable=hello_world_py,
        dag_id=dag_id)

return dag


dag = DAG("dynamic_yolo_pg_", default_args=default_args,     
        schedule_interval=timedelta(hours=1))

"""
Bahavior:
Create an exact DAG which in turn will create it's own file
https://www.astronomer.io/guides/dynamically-generating-dags/
"""
pg_hook = PostgresHook(postgres_conn_id='some_db')
conn = pg_hook.get_conn()
cursor = conn.cursor(cursor_factory=NamedTupleCursor)
cursor.execute("SELECT * FROM airflow_test_command;")
commands = cursor.fetchall()
for command in commands:
  dag_id = command.id
  schedule = timedelta(days=1)

  id = "dynamic_yolo_" + str(dag_id)

  print id

  globals()[id] = create_dag(id,
                           schedule,
                           default_args)

最好的,

标签: postgresqlairflowairflow-scheduler

解决方案


这可以使用 [1] 中提到的步骤使用自我管理的 Airflow Webserver 来解决。完成此操作后,如果您决定在自管理网络服务器前添加身份验证,则创建入口后,您的 BackendServices 应该出现在 Google IAP 控制台上,您可以启用 IAP。如果您想以编程方式访问您的气流,您还可以使用服务帐户为您的自我管理的 Airflow Web 服务器 [2] 使用 JWT 身份验证。

[1] https://cloud.google.com/composer/docs/how-to/managing/deploy-webserver

[2] https://cloud.google.com/iap/docs/authentication-howto


推荐阅读