python - 来自 Airflow 数据库挂钩的 SQLAlchemy 引擎
问题描述
从 Airflow 连接 ID 获取 SQLAlchemy 引擎的最佳方法是什么?
目前我正在创建一个钩子,检索它的 URI,然后使用它来创建一个 SQLAlchemy 引擎。
postgres_hook = PostgresHook(self.postgres_conn_id)
engine = create_engine(postgres_hook.get_uri())
这有效,但两个命令都连接到数据库。
当我在连接上有“额外”参数时,需要第三个连接来检索这些参数(请参阅Retrieve full connection URI from Airflow Postgres hook)
有没有更短更直接的方法?
解决方案
需要明确的是,您的命令确实会建立两个数据库连接,但它连接到两个单独的数据库(除非您尝试连接到 Postgres Airflow 数据库)。初始化钩子的第一行不应该建立任何连接。只有第二行首先从 Airflow 数据库中获取连接详细信息(我认为您无法避免),然后使用它连接到 Postgres 数据库(我认为这是重点)。
你可以稍微简单一些:
postgres_hook = PostgresHook(self.postgres_conn_id)
engine = postgres_hook.get_sqlalchemy_engine()
这看起来很干净,但是如果您想在不经过的情况下获得更直接的信息PostgresHook
,则可以通过查询 Airflow 的数据库来直接获取它。但是,这意味着您最终将复制代码以从连接对象构建 URI。如果您想继续此操作,get_connection()的底层实现就是一个很好的例子。
from airflow.settings import Session
conn = session.query(Connection).filter(Connection.conn_id == self.postgres_conn_id).one()
... # build uri from connection
create_engine(uri)
此外,如果您希望能够在extras
没有单独的数据库获取的情况下访问,您get_uri()
可以get_sqlalchemy_engine()
覆盖BaseHook.get_connection()以将连接对象保存到实例变量以供重用。这需要在 之上创建自己的钩子PostgresHook
,所以我知道这可能并不理想。
class CustomPostgresHook(PostgresHook):
@classmethod
def get_connection(cls, conn_id): # type: (str) -> Connection
conn = super().get_connection(conn_id)
self.conn_obj = conn # can't use self.conn because PostgresHook will overriden in https://github.com/apache/airflow/blob/1.10.10/airflow/hooks/postgres_hook.py#L93 by a different type of connection
return conn
postgres_hook = CustomPostgresHook(self.postgres_conn_id)
uri = postgres_hook.get_uri()
# do something with postgres_hook.conn_obj.extras_dejson
一些内置的 Airflow 挂钩已经具有这种行为(grpc、samba、tableau),但它绝对不是标准化的。
推荐阅读
- apache-camel - 如何使用 Camel Jetty 接收包含文件的 multipart/form-data 请求?
- sql-server - SQL Server 多对多没有主键关系
- julia - Julia Plots 动画`gif()` 无限期挂起
- macos - Mac Iterm2 上分支名称旁边的“*”是什么意思?
- excel - if x > y, but < yz 乘以 b 的 IF 函数
- reactjs - Firebase 错误:未创建 Firebase 应用默认值
- python - If 语句中未定义变量(Python,Ursina 模块)
- html - 清理 wordpress woocommerce 产品描述的 HTML?
- python - 在没有 NVIDIA GPU 的情况下使用 CUDA?
- functional-programming - 榆树指导骰子练习