首页 > 解决方案 > 如何使用 PyGreSQL 执行并行查询?

问题描述

我正在尝试与 PyGreSQL 和多处理并行运行多个查询,但下面的代码挂起而不返回:

from pg import DB
from multiprocessing import Pool
from functools import partial


def create_query(table_name):
  return f"""create table {table_name} (id integer);
  CREATE INDEX ON {table_name} USING BTREE (id);"""

my_queries = [ create_query('foo'), create_query('bar'), create_query('baz') ]


def execute_query(conn_string, query):
  con = DB(conn_string)
  con.query(query)
  con.close()

rs_conn_string = "host=localhost port=5432 dbname=postgres user=postgres password="
pool = Pool(processes=len(my_queries))
pool.map(partial(execute_query,rs_conn_string), my_queries)

有什么办法让它工作吗?如果一个查询失败而另一个查询回滚,是否可以在同一个“事务”中进行 3 个正在运行的查询?

标签: python-3.xpygresql

解决方案


一个明显的问题是,您总是运行pool.map,不仅在主进程中,而且在并行子进程中使用的解释器导入脚本时。你应该这样做:

def run_all():
    with Pool(processes=len(my_queries)) as pool:
        pool.map(partial(execute_query,rs_conn_string), my_queries)

if __name__ == '__main__':
    run_all()

关于您的第二个问题,这是不可能的,因为事务是每个连接的,如果您这样做,它们将存在于单独的进程中。

异步命令处理可能是您想要的,但PyGreSQL尚不支持。Psygopg + aiopg可能更适合做这样的事情。


推荐阅读