dask - 如何将大型 Oracle 表的 SUBSET 加载到 Dask 数据帧中?
问题描述
这是我尝试过的:
dask_rf = dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)
这给了我一个“大对象”警告并建议使用 client.scatter。问题是 client.scatter 似乎需要首先将数据加载到 Pandas 数据帧中,这就是我首先使用 Dask 的原因,因为 RAM 限制。
Oracle 表太大而无法使用 Dask 的 read_sql_table 读取,因为 read_sql_table 不会以任何方式过滤表。
想法?Dask 不适用于我的用例?
编辑 - 根据下面的答案并在研究如何这样做之后,这是我尝试转换为使用 sqlalchemy 表达式:
from sqlalchemy import create_engine, Table, Column, String, MetaData, select
sql_engine = create_engine(f'oracle+cx_oracle://username:password@environment')
metadata = MetaData(bind=sql_engine)
table_reference = Table('table', metadata, autoload=True, schema='schema')
s = select([table_reference ]).where(table_reference .c.field_to_filter == filtered_value)
import dask.dataframe as dd
dask_df = dd.read_sql_table(s, 'sqlalchemy_connection_string', 'index_col', schema = 'schema')
dask_df.count()
Dask 系列结构:npartitions=1 action_timestamp int64 vendor_name ... dtype:int64 Dask 名称:dataframe-count-agg,1996 个任务
dask_df.count().compute()
DatabaseError:(cx_Oracle.DatabaseError)ORA-02391:超出同时 SESSIONS_PER_USER 限制(此错误的背景: http ://sqlalche.me/e/4xp6 )
为什么要尝试连接到 Oracle?
编辑#2 - 以防万一,我已经进行了额外的测试。我想证明 sqlalchemy 可以独立工作,所以我通过以下方式证明了这一点:
result = sql_engine.execute(s)
type(result)
sqlalchemy.engine.result.ResultProxy
result.fetchone()
结果显示
这似乎排除了 SQLAlchemy/Oracle 问题,所以有什么想法可以尝试下一步吗?
解决方案
我现在正在寻找同样的东西。
为了不被卡住......您可能没有足够的 RAM,但您可能有很多可用存储空间。所以......现在的建议
# imports
import pandas as pd
import cx_Oracle as cx
import dask.dataframe as dd
# Connection stuff
...
conn = ...
# Query
qry = "SELECT * FROM HUGE_TABLE"
# Pandas Chunks
for ix , chunk in enumerate(pd.io.sql.read_sql(qry , conn , ... , chunksize=1000000)):
pd.DataFrame(chunk).to_csv(f"chunk_{ix}.csv" , sep=";") # or to_parquet
# Dask dataframe reading from files (chunks)
dataset = dd.read_csv("chunk_*.csv" , sep=";" , blocksie=32e6) # or read_parquet
由于这是 IO 密集型并且您正在执行顺序操作,因此可能需要一段时间。
我对“导出”更快的建议是对表进行分区并按每个分区并行执行块导出。
推荐阅读
- android - Android中线程之间的资源竞争
- c# - 如何优化字符串重写规则程序
- handlebars.js - 如何将车把转换为我在下面提到的 ejs
- python - 如何为列表中的项目分配值?
- python - 如何在字符串中添加单词?
- mysql - 如何将 JSON 对象转换为 MySQL 行?
- stan - rstan 包中有函数 check_rhat() 吗?
- javascript - 变量中的多选
- kotlin - Kotlin - 当表达式超过类类型时
- c# - Azure 上的 Blazor 中的 HttpClient.GetJsonAsync() 导致“无效的 JSON 字符串”,在本地它工作得很好。知道为什么吗?