python-3.x - Postgres、sqlalchemy 和多处理
问题描述
我对 python 多处理相对较新,并且在与该主题相关的许多问题上苦苦挣扎。我最新的问题是多处理、sqlalchemy 和 postgres 的组合。有了这个组合,我有时会得到一个
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac
经过研究,我在文档中发现了这个提示:
https://docs.sqlalchemy.org/en/13/core/pooling.html “重要的是,在使用连接池时,以及在使用通过 create_engine() 创建的引擎时,池连接不共享给一个分叉的进程。TCP 连接表示为文件描述符,它通常跨进程边界工作,这意味着这将导致代表两个或多个完全独立的 Python 解释器状态对文件描述符的并发访问。
有两种方法可以处理这个问题。
第一种是在子进程中创建一个新引擎,或者在现有引擎上,在子进程使用任何连接之前调用 Engine.dispose()。这将从池中删除所有现有连接,以便创建所有新连接。"
和这个:
uWSGI、Flask、sqlalchemy 和 postgres:SSL 错误:解密失败或坏记录 mac “问题最终是 uwsgi 的分叉。
当使用主进程处理多个进程时,uwsgi 在主进程中初始化应用程序,然后将应用程序复制到每个工作进程。问题是,如果在初始化应用程序时打开数据库连接,就会有多个进程共享同一个连接,从而导致上述错误。”
我的解释是,在使用多处理时,我必须确保每个进程都使用新引擎。在我的子进程中,只有一个类可以读写 postgres-db,所以我决定在类中定义一个 slqalchemy 引擎:
class WS_DB_Booker():
def __init__(self):
engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
Base_inside_class = declarative_base()
Base_inside_class.metadata.create_all(engine_inside_class)
session_factory_inside_class = sessionmaker(bind=engine_inside_class)
self.DBSession_inside_class = scoped_session(session_factory_inside_class)
def example_method_to_read_from_db(self):
try:
sql_alc_session = self.DBSession_inside_class()
sql_alc_session.query(and_so_on....
这在第一次试验中运行良好,没有任何问题。但我不确定这是在类中定义引擎的正确方法还是会导致任何问题?
解决方案
您如何分叉您的流程或哪个组件进行分叉是很难理解的。
您需要确保在分叉后实例化WS_DB_Broker
该类!
如果您以错误的方式执行此操作(在 fork 之前实例化),则Engine
可能已经dbapi
在它的Pool
. 请参阅有关使用引擎的 SQLAlchemy文档。
为了使您的错误更加明显,您可以执行以下操作:
import os
class WS_DB_Booker():
def __init__(self):
# Remember the process id from the time of instantiation. If the
# interpreter is forked then the output of `os.getpid()` will change.
self._pid = os.getpid()
engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
Base_inside_class = declarative_base()
Base_inside_class.metadata.create_all(engine_inside_class)
session_factory_inside_class = sessionmaker(bind=engine_inside_class)
self._session = scoped_session(session_factory_inside_class)
def get_session():
if self._pid != os.getpid():
raise RuntimeError("Forked after instantiating! Please fix!")
return self._session()
def example_method_to_read_from_db(self):
try:
sql_alc_session = self.get_session()
# ^^^^^^^^^^^^^
# this may throw RuntimeError when used incorrectly, thus saving you
# from your own mistake.
sql_alc_session.query(and_so_on....
推荐阅读
- databricks - 事件中心:org.apache.spark.sql.AnalysisException:找不到所需的属性“body”
- laravel - 在 Laravel 中使用 BootstrapVue 上传文件
- node.js - Can not convert a string to number, (price of a product) which is taken from mongodb collection by query
- postgresql - postgres中的选择问题
- java - Java BigInteger to the intValue
- jmeter - “无效的 SQL 类型:sqlKind = UNINITIALIZED”从 JMeter 运行查询时
- javascript - 测试 Javascript AES 加密的完整性
- java - 每次在其中键入内容时,JTextField 都会创建自身的副本
- r - 在 R 中 - 将一行数据迭代地添加到多个文件中
- amazon-web-services - AWS 上的红帽 Openshift