首页 > 解决方案 > 使用 pyspark 读取多个 csv 文件。只有当数据大小超过一定大小时

问题描述

使用 pyspark 读取多个 csv 文件。只有当数据大小超过一定大小时

代码

val rawStocks = allStocks.filter(_.size >= 260 * 5 + 10)

示例 pyspark 代码

rawStocks = filter(lambda stock: len(stock) >= 260*5+10,readHistories(prefix + "stocks/"))

我的代码

stocksDir = sc.textFile("./data/stocks/")
header = stocksDir.first()
stocksDir = stocksDir.filter(lambda x : x!= header)
stocksDir.take(5)

> ['31-Dec-13,22.25,22.47,22.15,22.43,33246316', 
> '30-Dec-13,22.00,22.33,22.00,22.25,27713912', 
> '27-Dec-13,21.79,22.05,21.78,22.02,24100877', 
> '26-Dec-13,21.72,21.88,21.72,21.80,17067392',  
> '24-Dec-13,21.46,21.76,21.44,21.69,18371968']

我试试这个。但 len(fields) 是计算行数,而不是 csv 文件大小。

stocksDir.map(lambda x:x.split(header)).filter(lambda fields:len(fields)>0).collect()

> [['', ''],
['31.Dec.13,32.38,32.38,31.66,31.66,598'],
['16.Dec.13,31.53,31.53,31.53,31.53,363'],
['9.Dec.13,32.18,32.26,31.58,31.83,30475'],
['22.Nov.13,31.05,31.43,31.05,31.43,735'],
['', ''],
['31.Dec.13,21.31,21.76,20.8,21.11,61631'],
['21.Nov.13,19.25,19.78,19.22,19.57,55711']]


stocksDir.map(lambda x:x.split(header)).filter(lambda fields:len(fields)>5).collect()
> []

标签: apache-sparkpyspark

解决方案


推荐阅读