python - 如何将 Spark RDD 中的键/序列对分解为键/值对?
问题描述
我有一个这种格式的RDD:
(key, [v1, v2, v3, ..., vn])
我想把它变成这样的RDD:
(key, v1), (key, v2), ..., (key, vn)
为了详细说明,我使用sc.wholeTextFile(dir)
读取目录中的所有文本文档,dir
RDD 将是这样的:(document, content)
. 我想把它转换成这样的RDD (document, word)
:. 换句话说,我想将文档信息保留在 RDD 中。
我曾经RDD.flatMap(lambda (document, text): (document, re.split(' '), text))
将文本拆分为单词,但找不到将单词序列分解为单独元素的方法。
解决方案
在 Scala 中用 case似乎更直观。反正。
在 pyspark 中:
%python
files = sc.wholeTextFiles("/FileStore/tables/x*.txt",0).map(lambda x: (x[0],x[1].replace('?',' ').replace('.',' ').replace('\r',' ').replace('\n',' ') ))
wordsAndFile = files.map(lambda k_v: (k_v[0], k_v[1].split(" ")))
res = wordsAndFile.map(lambda k_v: [(k_v[0], x) for x in k_v[1]])
final = res.flatMap(lambda x: x).filter(lambda x: x[1] is not u'')
final.collect()
使用 3 个文件,我得到了这个,部分显示:
Out[129]: [('dbfs:/FileStore/tables/x1.txt', 'Hi'),
('dbfs:/FileStore/tables/x1.txt', 'how'),
('dbfs:/FileStore/tables/x1.txt', 'are'),
('dbfs:/FileStore/tables/x1.txt', 'you'),
('dbfs:/FileStore/tables/x1.txt', 'I'),
('dbfs:/FileStore/tables/x1.txt', 'am'),
('dbfs:/FileStore/tables/x1.txt', 'fine'),
('dbfs:/FileStore/tables/x1.txt', '3rd'),
('dbfs:/FileStore/tables/x1.txt', 'line'),
('dbfs:/FileStore/tables/x2.txt', 'John'),
('dbfs:/FileStore/tables/x2.txt', 'I'),
('dbfs:/FileStore/tables/x2.txt', 'am'),
...
...
只是为了优点,带有大小写的 Scala 方法更容易,更少 \r, \n 东西。没有应用replaceAll,只是为了说明一点:
val files = sc.wholeTextFiles("/FileStore/tables/x*.txt",0)
val lines2 = files.map { case (x, y) => (x, y.split(" ")) }
val lines3 = lines2.flatMap { case (k, v) => { v.map(x => (k,x)) } }
lines3.collect
推荐阅读
- php - 如何使用 php 在 mandrill 中使用电子邮件标签(用于跟踪电子邮件)
- sql - SQL 查询仅获取具有特定类型的记录
- java - Java tomcat应用程序挂起,停止后无法重新启动tomcat
- networking - 计算两个代理品种之间的交互以更改一个的属性类型(netlogo)
- php - Laravel5:如果没有准备好运行的命令,如何禁用默认调度程序消息
- optimization - 如何在 Cplex 中使用“IloOplOutputFile”来编写具有多个索引的变量?
- html - 如何在使用省略号文本溢出样式时减小表格列的宽度?
- c# - UWP:写一个巨大的日志文件很慢
- c - C中kill系统调用的模棱两可的行为
- javascript - ActiveXObject 是否有任何替代方案。我想在 chrome 浏览器中单击按钮时打开命令提示符