postgresql - 基于数据库连接上可用的行动态创建 DAG
问题描述
我想从数据库表查询中创建一个动态创建的 DAG。当我尝试从精确数字范围或基于气流设置中的可用对象创建动态创建 DAG 时,它成功了。但是,当我尝试使用 PostgresHook 并为表的每一行创建一个 DAG 时,我可以看到每当我在表中添加新行时都会生成一个新的 DAG。但是事实证明,我无法在气流网络服务器 ui 上单击新创建的 DAG。有关更多上下文,我正在使用 Google Cloud Composer。我已经按照DAG 中提到的步骤在 Google Cloud Composer 网络服务器上不可点击,但在本地 Airflow 上工作正常。但是它仍然不适用于我的情况。
这是我的代码
from datetime import datetime, timedelta
from airflow import DAG
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import NamedTupleCursor
import os
default_args = {
"owner": "debug",
"depends_on_past": False,
"start_date": datetime(2018, 10, 17),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
def create_dag(dag_id,
schedule,
default_args):
def hello_world_py(*args):
print 'Hello from DAG: {}'.format(dag_id)
dag = DAG(dag_id,
schedule_interval=timedelta(days=1),
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id=dag_id,
python_callable=hello_world_py,
dag_id=dag_id)
return dag
dag = DAG("dynamic_yolo_pg_", default_args=default_args,
schedule_interval=timedelta(hours=1))
"""
Bahavior:
Create an exact DAG which in turn will create it's own file
https://www.astronomer.io/guides/dynamically-generating-dags/
"""
pg_hook = PostgresHook(postgres_conn_id='some_db')
conn = pg_hook.get_conn()
cursor = conn.cursor(cursor_factory=NamedTupleCursor)
cursor.execute("SELECT * FROM airflow_test_command;")
commands = cursor.fetchall()
for command in commands:
dag_id = command.id
schedule = timedelta(days=1)
id = "dynamic_yolo_" + str(dag_id)
print id
globals()[id] = create_dag(id,
schedule,
default_args)
最好的,
解决方案
这可以使用 [1] 中提到的步骤使用自我管理的 Airflow Webserver 来解决。完成此操作后,如果您决定在自管理网络服务器前添加身份验证,则创建入口后,您的 BackendServices 应该出现在 Google IAP 控制台上,您可以启用 IAP。如果您想以编程方式访问您的气流,您还可以使用服务帐户为您的自我管理的 Airflow Web 服务器 [2] 使用 JWT 身份验证。
[1] https://cloud.google.com/composer/docs/how-to/managing/deploy-webserver
推荐阅读
- asp.net-core - 如何在 ASP.NET Core 的 .json 配置文件中以跨平台方式存储文件路径?
- python - 尝试使用 python 客户端获取 CRD 时出现 404
- javascript - 使用 Html 或 JavaScript 重定向到多个 URL
- java - 将字符串中的负标记设置为零(java)
- unit-testing - rxjava testScheduler 竞争条件
- pytorch - 如何获得不包括某些索引的 argmaxed 火炬张量?
- python - 如何在pyhdb中指定数据库名称
- python - 如何在不同的python程序之间传递信息
- javascript - 加载值时如何编辑表单?
- c# - 具有有效而非 404 空值的 OData SingleResult