python - Airflow 中外部连接的连接池
问题描述
我正在尝试为在 Airflow 中创建的外部连接找到一种连接池管理方法。
气流版本:2.1.0
Python 版本:3.9.5
气流数据库:SQLite
创建外部连接:MySQL 和雪花
我知道airflow.cfg 文件中有属性
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
但是这些属性用于管理气流内部数据库,在我的例子中是 SQLite。
我几乎没有在 MySQL 和 Snowflake 中读取或写入数据的任务。
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE
)
和
insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)
从 MySQL 读取数据
def get_records():
mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
records = mysql_hook.get_records(sql=r"""Some select query""")
print(records)
我观察到的是,正在为 Snowflake 的每个任务(同一个 dag 中有多个任务)创建一个新会话,但尚未验证 MySQL 的相同。
有没有办法维护外部连接的连接池(在我的情况下是雪花和 MySQL)或任何其他方式在同一会话中运行同一 DAG 中的所有查询?
谢谢
解决方案
Airflow 提供使用池作为限制外部服务并发的一种方式。
您可以通过 UI 创建池:菜单 -> 管理 -> 池
或使用CLI:
airflow pools set NAME slots
池中的槽定义了使用资源的任务可以并行运行的数量。如果池已满,则任务将排队,直到插槽打开。
在运算符中使用池只需添加pool=Name
到运算符。
在您的情况下,假设Pool
是使用名称雪花创建的,然后:
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
pool='snowflake',
)
请注意,默认情况下,任务占用池中的 1 个插槽,但这是可配置的。pool_slots
如果使用示例,一个任务可能占用超过 1 个插槽:
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
...
pool='snowflake',
pool_slots=2,
)
推荐阅读
- python - Pandas to_csv 函数格式减号为“–”
- python - 为什么我的程序让我的机器人关闭电源?
- r - 在 R 中使用 pgmm (plm-package) 时,系统完全是单一错误
- javascript - 如何在 React js 中使用 ref 更改元素的样式
- python - 启动 Python 调试器时 VS Code 中的终端错误
- python - 虚拟环境突然损坏(没有名为“contextlib”的模块)
- excel - 如何考虑多行过滤数据?
- java - spring cloud 合约可以处理不同响应的重复请求吗?
- c# - 如何在 .NET Core 中使用 TransactionScope
- swift - iphone 12 / 13 mini 模拟器中的尺寸不正确