dask - 并行 SQL 查询
问题描述
如何使用 dask 并行运行具有不同列维度的 SQL 查询?以下是我的尝试:
from dask.delayed import delayed
from dask.diagnostics import ProgressBar
import dask
ProgressBar().register()
con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
@delayed
def loadsql(sql):
return pd.read_sql_query(sql,con)
results = [loadsql(x) for x in sql_to_run]
dask.compute(results)
df1=results[0]
df2=results[1]
df3=results[2]
df4=results[3]
df5=results[4]
df6=results[5]
但是,这会导致引发以下错误:
DatabaseError: sql 执行失败: "SQL QUERY" ORA-01013: 用户请求取消当前操作无法回滚
然后不久之后出现另一个错误:
MultipleInstanceError:正在创建多个不兼容的 TerminalInteractiveShell 子类实例。
sql_to_run 是不同 sql 查询的列表
有什么建议或指点吗??谢谢!
更新 9.7.18
认为这更像是我没有足够仔细地阅读文档的情况。事实上,在 loadsql 函数之外的 con 是导致问题的原因。以下是现在似乎按预期工作的代码更改。
def loadsql(sql):
con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
result = pd.read_sql_query(sql,con)
con.close()
return result
values = [delayed(loadsql)(x) for x in sql_to_run]
#MultiProcessing version
import dask.multiprocessing
results = dask.compute(*values, scheduler='processes')
#My sample queries took 56.2 seconds
#MultiThreaded version
import dask.threaded
results = dask.compute(*values, scheduler='threads')
#My sample queries took 51.5 seconds
解决方案
我的猜测是,oracle 客户端不是线程安全的。如果 conn 对象序列化,您可以尝试使用进程运行(通过使用多处理调度程序或分布式调度程序) - 这可能不太可能。更有可能工作的是在 内创建连接loadsql
,因此每次通话都会重新制作,并且不同的连接希望不会相互干扰。
推荐阅读
- python - 更优雅的递归迭代创建员工列表的方式
- json - Postgres 转换具有重复 ID 的 json
- angular - 无法将 Thrift 生成的 Typescript 导入 Angular 7 项目
- node.js - 无法在 Google Drive 上使用 googleapis 上传文件
- sikuli - 使用 Sikuli Finder() 搜索带有图标的屏幕截图,在循环中使用时提供类似缓存的响应
- android - 是否可以在不请求任何额外权限的情况下修改保存在缓存目录中的图像?
- r - 根据另一列的最大实例数对数据框进行子集
- html - 向右浮动将 div 推到页面末尾
- php - PHP 外部 IP
- python - 使用 cx_Oracle executemany() 从 Python 列表/字典批量插入到 Oracle 数据库