首页 > 解决方案 > Postgres、sqlalchemy 和多处理

问题描述

我对 python 多处理相对较新,并且在与该主题相关的许多问题上苦苦挣扎。我最新的问题是多处理、sqlalchemy 和 postgres 的组合。有了这个组合,我有时会得到一个

 sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac

经过研究,我在文档中发现了这个提示:

https://docs.sqlalchemy.org/en/13/core/pooling.html “重要的是,在使用连接池时,以及在使用通过 create_engine() 创建的引擎时,池连接不共享给一个分叉的进程。TCP 连接表示为文件描述符,它通常跨进程边界工作,这意味着这将导致代表两个或多个完全独立的 Python 解释器状态对文件描述符的并发访问。

有两种方法可以处理这个问题。

第一种是在子进程中创建一个新引擎,或者在现有引擎上,在子进程使用任何连接之前调用 Engine.dispose()。这将从池中删除所有现有连接,以便创建所有新连接。"

和这个:

uWSGI、Flask、sqlalchemy 和 postgres:SSL 错误:解密失败或坏记录 mac “问题最终是 uwsgi 的分叉。

当使用主进程处理多个进程时,uwsgi 在主进程中初始化应用程序,然后将应用程序复制到每个工作进程。问题是,如果在初始化应用程序时打开数据库连接,就会有多个进程共享同一个连接,从而导致上述错误。”

我的解释是,在使用多处理时,我必须确保每个进程都使用新引擎。在我的子进程中,只有一个类可以读写 postgres-db,所以我决定在类中定义一个 slqalchemy 引擎:

class WS_DB_Booker():
    def __init__(self):

        engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
        Base_inside_class = declarative_base()
        Base_inside_class.metadata.create_all(engine_inside_class)
        session_factory_inside_class = sessionmaker(bind=engine_inside_class)
        self.DBSession_inside_class = scoped_session(session_factory_inside_class)

    def example_method_to_read_from_db(self):
        try:
            sql_alc_session = self.DBSession_inside_class()
            sql_alc_session.query(and_so_on....

这在第一次试验中运行良好,没有任何问题。但我不确定这是在类中定义引擎的正确方法还是会导致任何问题?

标签: python-3.xpostgresqlsqlalchemymultiprocessing

解决方案


您如何分叉您的流程或哪个组件进行分叉是很难理解的。

您需要确保在分叉后实例化WS_DB_Broker该类!

如果您以错误的方式执行此操作(在 fork 之前实例化),则Engine可能已经dbapi在它的Pool. 请参阅有关使用引擎的 SQLAlchemy文档。

为了使您的错误更加明显,您可以执行以下操作:

import os

class WS_DB_Booker():
    def __init__(self):
        # Remember the process id from the time of instantiation. If the
        # interpreter is forked then the output of `os.getpid()` will change.
        self._pid = os.getpid()

        engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
        Base_inside_class = declarative_base()
        Base_inside_class.metadata.create_all(engine_inside_class)
        session_factory_inside_class = sessionmaker(bind=engine_inside_class)
        self._session = scoped_session(session_factory_inside_class)

    def get_session():
        if self._pid != os.getpid():
             raise RuntimeError("Forked after instantiating! Please fix!")
        return self._session()

    def example_method_to_read_from_db(self):
        try:
            sql_alc_session = self.get_session()  
            #                      ^^^^^^^^^^^^^
            # this may throw RuntimeError when used incorrectly, thus saving you
            # from your own mistake.
            sql_alc_session.query(and_so_on....

推荐阅读