首页 > 解决方案 > 如何将 Spark RDD 中的键/序列对分解为键/值对?

问题描述

我有一个这种格式的RDD:

(key, [v1, v2, v3, ..., vn])

我想把它变成这样的RDD:

(key, v1), (key, v2), ..., (key, vn)

为了详细说明,我使用sc.wholeTextFile(dir)读取目录中的所有文本文档,dirRDD 将是这样的:(document, content). 我想把它转换成这样的RDD (document, word):. 换句话说,我想将文档信息保留在 RDD 中。

我曾经RDD.flatMap(lambda (document, text): (document, re.split(' '), text))将文本拆分为单词,但找不到将单词序列分解为单独元素的方法。

标签: pythonapache-sparkpyspark

解决方案


在 Scala 中用 case似乎更直观。反正。

在 pyspark 中:

%python
files = sc.wholeTextFiles("/FileStore/tables/x*.txt",0).map(lambda x: (x[0],x[1].replace('?',' ').replace('.',' ').replace('\r',' ').replace('\n',' ')  )) 
wordsAndFile = files.map(lambda k_v: (k_v[0], k_v[1].split(" ")))
res = wordsAndFile.map(lambda k_v: [(k_v[0], x) for x in k_v[1]])
final = res.flatMap(lambda x: x).filter(lambda x: x[1] is not u'')
final.collect()

使用 3 个文件,我得到了这个,部分显示:

Out[129]: [('dbfs:/FileStore/tables/x1.txt', 'Hi'),
('dbfs:/FileStore/tables/x1.txt', 'how'),
('dbfs:/FileStore/tables/x1.txt', 'are'),
('dbfs:/FileStore/tables/x1.txt', 'you'),
('dbfs:/FileStore/tables/x1.txt', 'I'),
('dbfs:/FileStore/tables/x1.txt', 'am'),
('dbfs:/FileStore/tables/x1.txt', 'fine'),
('dbfs:/FileStore/tables/x1.txt', '3rd'),
('dbfs:/FileStore/tables/x1.txt', 'line'),
('dbfs:/FileStore/tables/x2.txt', 'John'),
('dbfs:/FileStore/tables/x2.txt', 'I'),
('dbfs:/FileStore/tables/x2.txt', 'am'),
...
...

只是为了优点,带有大小写的 Scala 方法更容易,更少 \r, \n 东西。没有应用replaceAll,只是为了说明一点:

val files = sc.wholeTextFiles("/FileStore/tables/x*.txt",0) 
val lines2 = files.map { case (x, y) =>  (x, y.split(" ")) }
val lines3 = lines2.flatMap {  case (k, v) => { v.map(x => (k,x))    }  }
lines3.collect

推荐阅读