首页 > 解决方案 > 如何将大型 Oracle 表的 SUBSET 加载到 Dask 数据帧中?

问题描述

这是我尝试过的:

dask_rf = dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)

这给了我一个“大对象”警告并建议使用 client.scatter。问题是 client.scatter 似乎需要首先将数据加载到 Pandas 数据帧中,这就是我首先使用 Dask 的原因,因为 RAM 限制。

Oracle 表太大而无法使用 Dask 的 read_sql_table 读取,因为 read_sql_table 不会以任何方式过滤表。

想法?Dask 不适用于我的用例?

编辑 - 根据下面的答案并在研究如何这样做之后,这是我尝试转换为使用 sqlalchemy 表达式:

from sqlalchemy import create_engine, Table, Column, String, MetaData, select

sql_engine = create_engine(f'oracle+cx_oracle://username:password@environment')

metadata = MetaData(bind=sql_engine)

table_reference = Table('table', metadata, autoload=True, schema='schema')

s = select([table_reference ]).where(table_reference .c.field_to_filter == filtered_value)

import dask.dataframe as dd

dask_df = dd.read_sql_table(s, 'sqlalchemy_connection_string', 'index_col', schema = 'schema')

dask_df.count()

Dask 系列结构:npartitions=1 action_timestamp int64 vendor_name ... dtype:int64 Dask 名称:dataframe-count-agg,1996 个任务

dask_df.count().compute()

DatabaseError:(cx_Oracle.DatabaseError)ORA-02391:超出同时 SESSIONS_PER_USER 限制(此错误的背景: http ://sqlalche.me/e/4xp6 )

为什么要尝试连接到 Oracle?

编辑#2 - 以防万一,我已经进行了额外的测试。我想证明 sqlalchemy 可以独立工作,所以我通过以下方式证明了这一点:

result = sql_engine.execute(s)

type(result)

sqlalchemy.engine.result.ResultProxy

result.fetchone()

结果显示

这似乎排除了 SQLAlchemy/Oracle 问题,所以有什么想法可以尝试下一步吗?

标签: dask

解决方案


我现在正在寻找同样的东西。

为了不被卡住......您可能没有足够的 RAM,但您可能有很多可用存储空间。所以......现在的建议

# imports
import pandas as pd 
import cx_Oracle as cx
import dask.dataframe as dd

# Connection stuff
...
conn = ...

# Query
qry = "SELECT * FROM HUGE_TABLE"

# Pandas Chunks
for ix , chunk in enumerate(pd.io.sql.read_sql(qry , conn , ... , chunksize=1000000)):
    pd.DataFrame(chunk).to_csv(f"chunk_{ix}.csv" , sep=";") # or to_parquet 

# Dask dataframe reading from files (chunks)
dataset = dd.read_csv("chunk_*.csv" , sep=";" , blocksie=32e6) # or read_parquet

由于这是 IO 密集型并且您正在执行顺序操作,因此可能需要一段时间。

我对“导出”更快的建议是对表进行分区并按每个分区并行执行块导出。


推荐阅读