python - 在 python 多进程之间共享一个 postgres 连接池
问题描述
我正在尝试将 psycopg2 的连接池与 python 的多进程库一起使用。
目前,尝试以上述方式在线程之间共享连接池会导致:
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
下面的代码应该会重现错误,这是读者必须设置一个简单的 postgres 数据库的警告。
from multiprocessing import Pool
from psycopg2 import pool
import psycopg2
import psycopg2.extras
connection_pool = pool.ThreadedConnectionPool(1, 200, database='postgres',
user='postgres', password='postgres', host='localhost')
class ConnectionFromPool:
"""
Class to establish a connection with the local PostgreSQL database
To use:
query = SELECT * FROM ticker_metadata
with ConnectionFromPool() as cursor:
cursor.execute(query)
results = cursor.fetchall()
Returns:
Arrayed Dictionary of results
[{...},{...},{...}]
"""
def __init__(self):
self.connection_pool = None
self.cursor = None
self.connection = None
def __enter__(self):
self.connection = connection_pool.getconn()
self.cursor = self.connection.cursor(
cursor_factory=psycopg2.extras.RealDictCursor)
return self.cursor
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
self.connection.rollback()
else:
self.cursor.close()
self.connection.commit()
connection_pool.putconn(self.connection)
def test_query(col_attribute):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool() as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
def multiprocessing(func, args, n_workers=2):
"""spawns multiple processes
Args:
func: function, to be performed
args: list of args to be passed to each call of func
n_workers: number of processes to be spawned
Return:
A list, containing the results of each proccess
"""
with Pool(processes=n_workers) as executor:
res = executor.starmap(func, args)
return list(res)
def main():
args = [[i] for i in range(1000)]
results = multiprocessing(test_query, args, 2)
if __name__ == "__main__":
main()
我已经尝试过的:
- 让每个进程打开和关闭自己与数据库的连接,而不是尝试使用连接池。这很慢。
- 让每个进程使用自己的连接池,这也很慢。
- 将连接传递一个 psycopg2 连接对象给每个进程,而不是
with
在 sql 查询中使用语句隐式调用它。这会引发一个错误,声称连接对象不可腌制。
注意:如果我sleep
在除一个进程之外的所有进程中都进行了操作,则非睡眠进程运行良好并执行其查询,直到其余线程未睡眠,然后我得到上述错误。
我已经阅读的内容:
最后:
如何在 python 的多进程(多处理)中使用连接池(psycopg2)。我愿意使用其他库,只要它们与 python 和 postgresql 数据库一起使用。
解决方案
这是我的解决方案。解决方案可以分为两部分:
- 具有将由每个唯一进程执行的包装函数。这个包装函数的主要目的是创建自己的连接池
- 对于步骤 1 中包装函数执行的每个查询,将连接池传递给查询函数(在上面的示例中,这是
test_query
)
更详细地参考问题中的示例:
步骤1
创建将重用每个进程一个连接池的包装函数:
def multi_query(list_of_cols):
# create a new connection pool per Process
new_pool = new_connection_pool()
# Pass the pool to each query
for col in list_of_cols:
test_query(col, new_pool)
第2步
修改查询函数以接受连接池:
旧test_query
:
def test_query(col_attribute):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool() as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
新test_query
:
def test_query(col_attribute, connection_pool=None):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool(connection_pool) as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
推荐阅读
- typescript - 打字稿:接口联盟
- reactjs - 如何确保整个应用都可以使用 Google 注销按钮
- viewmodel - ViewModel 未在 DialogFragment onDestroy 上清除
- python - 如何根据另一列的值从一列中获取第 n 个值(Python)
- reactjs - 在反应管理列表中按不同或多个来源过滤
- r - 如何迭代我的向量中的所有元素以进行反距离加权
- datetime - 如何在飞镖中使用频率分割两次?
- python - 多索引 DF 的 Python 数据框相关系数
- azure - az aks 无所事事地制造挂起
- c# - 是否可以在 asp.net 5.0 应用程序中使用 DataContractJsonSerializer 进行 json 序列化?