scala - Scala 嵌套映射到 Spark RDD
问题描述
我正在尝试将映射列表 (Seq[Map[String, Map[String, String]]) 转换为 RDD 表/元组,其中映射中的每个键-> 值对都平面映射到具有外部的元组地图的钥匙。例如
Map(
1 -> Map('k' -> 'v', 'k1' -> 'v1')
)
变成
(1, 'k', 'v')
(1, 'k1', 'v1')
我尝试了以下方法,但似乎在并发问题上失败了。我有两个工作节点,它重复了键 - > 值两次(我认为这是因为我做错了)
假设我将地图类型保存在案例类“记录”中
val rdd = sc.parallelize(1 to records.length)
val recordsIt = records.iterator
val res: RDD[(String, String, String)] = rdd.flatMap(f => {
val currItem = recordsIt.next()
val x: immutable.Iterable[(String, String, String)] = currItem.mapData.map(v => {
(currItem.identifier, v._1, v._2)
})
x
}).sortBy(r => r)
有没有办法在不遇到严重的并发问题的情况下并行化这项工作(我怀疑正在发生?
示例重复输出
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)
解决方案
Sparkparallelize
从一开始就非常高效(因为您已经开始将数据存储在内存中,因此仅在本地迭代它的成本要低得多),但是更惯用的方法是简单的flatMap
:
sc.parallelize(records.toSeq)
.flatMapValues(identity)
.map { case (k1, (k2, v)) => (k1, k2, v) }