sqlalchemy - SQLAlchemy 1.4 异步事件监听器
问题描述
尝试
在异步 sqlalchemy 引擎上
使用事件侦听器( https://docs.sqlalchemy.org/en/14/core/events.html#sqlalchemy.events.ConnectionEvents )并收到此错误:{NotImplementedError}asynchronous events are not implemented at this time. Apply synchronous listeners to the AsyncEngine.sync_engine or AsyncConnection.sync_connection attributes.
如果我正确理解这一点,我不能在异步引擎上使用事件,如果我想要事件支持,我必须切换到同步引擎?
engine: Engine = create_async_engine(
URL, echo=True, future=True
)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False, future=True
)
event.listens_for(engine, "do_connect")(do_connect_listener)
event.listens_for(engine, "engine_connect")(engine_connect_listener)
解决方案
从 github 上的 SQLAlchemy 讨论部分得到了答案。所有功劳归于 OP(https://github.com/sqlalchemy/sqlalchemy/discussions/6594#discussioncomment-836437):
“你好,
正如异常所建议的,您可以在应用事件时使用 sync_engine/sync_connection。这是一个例子:”
import asyncio
from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
@event.listens_for(engine.sync_engine, "do_connect")
def do_connect(dialect, conn_rec, cargs, cparams):
print("some-function")
@event.listens_for(engine.sync_engine, "engine_connect")
def engine_connect(conn, branch):
print("engine_connect", conn.exec_driver_sql("select 1").scalar())
async def go():
async with engine.connect() as conn:
res = await conn.exec_driver_sql("select 2")
print("go", res.scalar())
asyncio.run(go())
推荐阅读
- c++ - cpp.sh 打开示例文件
- python - 选择 Conv2D 过滤器值开始
- bash - 获取特定星期的星期几;重击;Linux
- python - Pyspark SQL Pandas UDF:返回一个数组
- node.js - npm 错误!找不到模块“编码”
- go - Memcached Kubernetes 客户端 Golang
- arrays - 从 2 张纸填充一维数组(不循环)
- python - Deep get item from json with default value in python
- webpack - 在不重建的情况下更改 webpack 中的 PublicPath
- c++17 - 属性前的 clang 格式空格