apache-spark - 使用 pyspark 读取多个 csv 文件。只有当数据大小超过一定大小时
问题描述
使用 pyspark 读取多个 csv 文件。只有当数据大小超过一定大小时
代码
val rawStocks = allStocks.filter(_.size >= 260 * 5 + 10)
示例 pyspark 代码
rawStocks = filter(lambda stock: len(stock) >= 260*5+10,readHistories(prefix + "stocks/"))
我的代码
stocksDir = sc.textFile("./data/stocks/")
header = stocksDir.first()
stocksDir = stocksDir.filter(lambda x : x!= header)
stocksDir.take(5)
> ['31-Dec-13,22.25,22.47,22.15,22.43,33246316',
> '30-Dec-13,22.00,22.33,22.00,22.25,27713912',
> '27-Dec-13,21.79,22.05,21.78,22.02,24100877',
> '26-Dec-13,21.72,21.88,21.72,21.80,17067392',
> '24-Dec-13,21.46,21.76,21.44,21.69,18371968']
我试试这个。但 len(fields) 是计算行数,而不是 csv 文件大小。
stocksDir.map(lambda x:x.split(header)).filter(lambda fields:len(fields)>0).collect()
> [['', ''],
['31.Dec.13,32.38,32.38,31.66,31.66,598'],
['16.Dec.13,31.53,31.53,31.53,31.53,363'],
['9.Dec.13,32.18,32.26,31.58,31.83,30475'],
['22.Nov.13,31.05,31.43,31.05,31.43,735'],
['', ''],
['31.Dec.13,21.31,21.76,20.8,21.11,61631'],
['21.Nov.13,19.25,19.78,19.22,19.57,55711']]
stocksDir.map(lambda x:x.split(header)).filter(lambda fields:len(fields)>5).collect()
> []
解决方案
推荐阅读
- json - JSON Schema `required` 允许空字符串作为值
- jquery - 根据逗号分隔的输入中的数字进行求和运算
- elasticsearch-watcher - 定义观察者时如何获取输入日志消息的详细信息?
- c# - 如何从字符串中收集匹配值的数组?
- hive - 蜂巢显示空值
- android - 从可点击的谷歌地图设置标记的标题
- css - css 在 vuejs 应用程序上加载速度不够快的问题
- artifactory - 从 jfrog CLI 发布构建信息以在 Artifactory 的树浏览器中更新 Used By 或 Produce By 信息
- python - [Python][Tornado] 是否可以在一个 python 程序中拥有多个不同端口和消息的 websocket?
- spring - Spring:无法获取此流程的模型