python - 如何使用 dask 读取 csv 和处理行?
问题描述
我想读取一个 28Gb 的 csv 文件并打印内容。但是,我的代码:
import json
import sys
from datetime import datetime
from hashlib import md5
import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd
from kyotocabinet import *
class IndexInKyoto:
def hash_string(self, string):
return md5(string.encode('utf-8')).hexdigest()
def dbproc(self, db):
db[self.hash_string(self.row)] = self.row
def index_row(self, row):
self.row = row
DB.process(self.dbproc, "index.kch")
start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()
df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv", blocksize=1000000)
df = df.compute(scheduler='processes') # convert to pandas
df = df.to_dict(orient='records')
for row in df:
ob.index_row(row)
print("Total time:")
print(datetime.utcnow-start_time)
不管用。当我运行命令时,htop
我可以看到 dask 正在运行,但没有任何输出。也没有创建任何 index.kch 文件。我在不使用 dask 的情况下咆哮同样的事情,它运行良好;我正在使用 Pandas 流 api ( chunksize
) 但它太慢了,因此我想使用 dask。
解决方案
df = df.compute(scheduler='processes') # convert to pandas
不要这样做!
您在单独的进程中加载这些片段,然后将所有要缝合的数据传输到主进程中的单个数据帧中。这只会增加您的处理开销,并在内存中创建数据的副本。
如果您想要做的只是(出于某种原因)将每一行打印到控制台,那么使用 Pandas 流式 CSV 阅读器(pd.read_csv(chunksize=..)
)会非常好。您可以使用 Dask 的分块运行它,如果您在读取数据的工作人员中进行打印,则可能会得到加速:
df = dd.read_csv(..)
# function to apply to each sub-dataframe
@dask.delayed
def print_a_block(d):
for row in df:
print(row)
dask.compute(*[print_a_block(d) for d in df.to_delayed()])
请注意,for row in df
实际上为您提供了列,也许您想要 iterrows,或者您实际上想要以某种方式处理您的数据。
推荐阅读
- java - Android 设置具有唯一 ID 的闹钟
- python - If there is a way to pre-calculate SQL View in order to speed-up queries from it?
- activiti - 在部署时将 activiti 从 5.22.0 迁移到 6.0.0 会导致 IllegalThreadStateException
- python - 无法使用 readlines() 方法从文本文件中读取
- javascript - 如何让这个 React 方法返回语句/变量?
- mod-rewrite - Mod rewrite query parameter validation and blocking also request url blocking
- python - 如何在 django 上使用 curl 测试获取请求?
- c++ - 在头文件之外实现静态方法时未定义的引用
- c# - 通过反射在脚本组件中组装适用于随机情况
- java - 如何在Android中从url显示授权的pdf而不下载它?