首页 > 解决方案 > 在 python 多进程之间共享一个 postgres 连接池

问题描述

我正在尝试将 psycopg2 的连接池与 python 的多进程库一起使用。

目前,尝试以上述方式在线程之间共享连接池会导致:

psycopg2.OperationalError: SSL error: decryption failed or bad record mac

下面的代码应该会重现错误,这是读者必须设置一个简单的 postgres 数据库的警告。


from multiprocessing import Pool
from psycopg2 import pool
import psycopg2
import psycopg2.extras

connection_pool = pool.ThreadedConnectionPool(1, 200, database='postgres',
    user='postgres', password='postgres', host='localhost')

class ConnectionFromPool:
    """
    Class to establish a connection with the local PostgreSQL database
    To use:
        query = SELECT * FROM ticker_metadata
        with ConnectionFromPool() as cursor:
            cursor.execute(query)
            results = cursor.fetchall()
    Returns:
        Arrayed Dictionary of results
        [{...},{...},{...}]
    """
    def __init__(self):
        self.connection_pool = None
        self.cursor = None
        self.connection = None
    def __enter__(self):
        self.connection = connection_pool.getconn()
        self.cursor = self.connection.cursor(
            cursor_factory=psycopg2.extras.RealDictCursor)
        return self.cursor
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_val is not None:
            self.connection.rollback()
        else:
            self.cursor.close()
            self.connection.commit()
        connection_pool.putconn(self.connection)


def test_query(col_attribute):
    """
    Simple SQL query
    """
    query = f"""SELECT *
                FROM col
                WHERE col = {col_attribute}
                ;"""
    with ConnectionFromPool() as cursor:
        cursor.execute(query)
        result = cursor.fetchall()
    return result


def multiprocessing(func, args, n_workers=2):
    """spawns multiple processes

    Args:
        func: function, to be performed
        args: list of args to be passed to each call of func
        n_workers: number of processes to be spawned

    Return:
        A list, containing the results of each proccess
    """
    with Pool(processes=n_workers) as executor:
        res = executor.starmap(func, args)

    return list(res)


def main():
    args = [[i] for i in range(1000)]
    results = multiprocessing(test_query, args, 2)


if __name__ == "__main__":
    main()

我已经尝试过的:

  1. 让每个进程打开和关闭自己与数据库的连接,而不是尝试使用连接池。这很慢。
  2. 让每个进程使用自己的连接池,这也很慢。
  3. 将连接传递一个 psycopg2 连接对象给每个进程,而不是with在 sql 查询中使用语句隐式调用它。这会引发一个错误,声称连接对象不可腌制。

注意:如果我sleep在除一个进程之外的所有进程中都进行了操作,则非睡眠进程运行良好并执行其查询,直到其余线程未睡眠,然后我得到上述错误。

我已经阅读的内容:

  1. 在 Python 中跨进程共享与 postgres db 的连接
  2. Python:从 Thread 调用时解密失败或错误记录 mac
  3. SQLAlchemy 和多个进程的连接问题

最后:

如何在 python 的多进程(多处理)中使用连接池(psycopg2)。我愿意使用其他库,只要它们与 python 和 postgresql 数据库一起使用。

标签: pythonpostgresqlmultiprocessingpsycopg2

解决方案


这是我的解决方案。解决方案可以分为两部分:

  1. 具有将由每个唯一进程执行的包装函数。这个包装函数的主要目的是创建自己的连接池
  2. 对于步骤 1 中包装函数执行的每个查询,将连接池传递给查询函数(在上面的示例中,这是test_query

更详细地参考问题中的示例:

步骤1

创建将重用每个进程一个连接池的包装函数:

def multi_query(list_of_cols):
    # create a new connection pool per Process
    new_pool = new_connection_pool()

    # Pass the pool to each query
    for col in list_of_cols:
        test_query(col, new_pool)

第2步

修改查询函数以接受连接池:

test_query

def test_query(col_attribute):
    """
    Simple SQL query
    """
    query = f"""SELECT *
                FROM col
                WHERE col = {col_attribute}
                ;"""
   with ConnectionFromPool() as cursor:
        cursor.execute(query)
        result = cursor.fetchall()
   return result

test_query

def test_query(col_attribute, connection_pool=None):
    """
    Simple SQL query
    """
    query = f"""SELECT *
                FROM col
                WHERE col = {col_attribute}
                ;"""
    with ConnectionFromPool(connection_pool) as cursor:
        cursor.execute(query)
        result = cursor.fetchall()
    return result

推荐阅读