首页 > 解决方案 > 并行 SQL 查询

问题描述

如何使用 dask 并行运行具有不同列维度的 SQL 查询?以下是我的尝试:

from dask.delayed import delayed
from dask.diagnostics import ProgressBar
import dask
ProgressBar().register()

con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")

@delayed
def loadsql(sql):
    return pd.read_sql_query(sql,con)

results = [loadsql(x) for x in sql_to_run] 

dask.compute(results)

df1=results[0]
df2=results[1]
df3=results[2]
df4=results[3]
df5=results[4]
df6=results[5]

但是,这会导致引发以下错误:

DatabaseError: sql 执行失败: "SQL QUERY" ORA-01013: 用户请求取消当前操作无法回滚

然后不久之后出现另一个错误:

MultipleInstanceError:正在创建多个不兼容的 TerminalInteractiveShell 子类实例。

sql_to_run 是不同 sql 查询的列表

有什么建议或指点吗??谢谢!


更新 9.7.18

认为这更像是我没有足够仔细地阅读文档的情况。事实上,在 loadsql 函数之外的 con 是导致问题的原因。以下是现在似乎按预期工作的代码更改。

def loadsql(sql):
    con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
    result =  pd.read_sql_query(sql,con)
    con.close()
    return result

values = [delayed(loadsql)(x) for x in sql_to_run] 
#MultiProcessing version
import dask.multiprocessing
results = dask.compute(*values, scheduler='processes')
#My sample queries took 56.2 seconds
#MultiThreaded version
import dask.threaded
results = dask.compute(*values, scheduler='threads')
#My sample queries took 51.5 seconds

标签: daskdask-delayed

解决方案


我的猜测是,oracle 客户端不是线程安全的。如果 conn 对象序列化,您可以尝试使用进程运行(通过使用多处理调度程序或分布式调度程序) - 这可能不太可能。更有可能工作的是在 内创建连接loadsql,因此每次通话都会重新制作,并且不同的连接希望不会相互干扰。


推荐阅读