python - 具有多个数据库连接的 lambda 上的 SqlAlchemy 和 pyMysql 连接池
问题描述
所以问题是我有多个数据库,我想在 SqlAlchemy 中使用相同的数据库池。这驻留在 Lambda 上,并且池是在 Lambda 启动时创建的。我希望后续的数据库连接使用现有的池。
工作正常的是初始池连接bpConnect
和对该连接的任何后续查询。
不起作用的是连接companyConnect
。我收到以下错误:
sqlalchemy.exc.StatementError: (builtins.AttributeError) 'XRaySession' object has no attribute 'cursor'
我有这些用于我的连接:
# Pooling
import sqlalchemy.pool as pool
#################### Engines ###################################################
def bpGetConnection():
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}"
engine = create_engine(engine_endpoint, echo_pool=True)
session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
db = session()
return db
bpPool = pool.StaticPool(bpGetConnection)
def companyGetConnection(database):
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}"
compEngine = create_engine(engine_endpoint, pool=bpPool)
session = XRaySessionMaker(bind=compEngine, autoflush=True, autocommit=False)
db = Session()
return db
#################### POOLING #############################################
def bpConnect():
conn = bpPool.connect()
return conn
def companyConnect(database):
conn = companyGetConnection(database)
return conn
#################################################################
在此示例中调用它们:
from connections import companyConnect, bpConnect
from models import Company, Customers
def getCustomers(companyID):
db = bpConnect()
myQuery = db.query(Company).filter(Company.id == companyID).one()
compDB = companyConnect(myQuery.database)
customers = compDB.query(Customers).all()
return customers
解决方案
我想出了如何使用 lambda 上的动态池来做到这一点:
class DBRegistry(object):
_db = {}
def get(self, url, **kwargs):
if url not in self._db:
engine = create_engine(url, **kwargs)
Session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
session = scoped_session(Session)
self._db[url] = session
return self._db[url]
compDB = DBRegistry()
def bpGetConnection():
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}?charset=utf8"
engine = create_engine(engine_endpoint)
session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
db = session()
return db
bpPool = pool.QueuePool(bpGetConnection, pool_size=500, timeout=11)
def bpConnect():
conn = bpPool.connect()
return conn
def companyConnect(database):
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}?charset=utf8"
conn = compDB.get(engine_endpoint, poolclass=QueuePool)
return conn
所以基本上它会使用一个池作为主数据库所需的持续连接,而另一个池将动态更改它需要的数据库。当需要连接到这些公司数据库之一时,它将检查该池是否已存在于池的注册表中。如果池不存在,它将创建一个并注册它。
推荐阅读
- r - 显示小标题的所有变量
- selenium - XPath 不适用于其中一种硒方案
- c# - 如何修复“构建到 WebGL 后 Unity 变换位置发生变化”?
- node.js - Morgan 和 Rotating-file-stream:interval 无法正常工作
- javascript - Javascript For循环一次附加相同的元素
- python - GIMP:POC 结果与减去混合模式的 GIMP 不匹配
- go - 请告诉我如何将多数组绑定到结构
- android - 如何解决 Android 上的位图内存泄漏?
- scala - Spark Kafka 到 HDFS 写入器的速度受到限制
- angular - 如何从 Angular 中的 API 获取单行数据