首页 > 解决方案 > Scala 嵌套映射到 Spark RDD

问题描述

我正在尝试将映射列表 (Seq[Map[String, Map[String, String]]) 转换为 RDD 表/元组,其中映射中的每个键-> 值对都平面映射到具有外部的元组地图的钥匙。例如

Map(
 1 -> Map('k' -> 'v', 'k1' -> 'v1')
)  

变成

(1, 'k', 'v')
(1, 'k1', 'v1')

我尝试了以下方法,但似乎在并发问题上失败了。我有两个工作节点,它重复了键 - > 值两次(我认为这是因为我做错了)

假设我将地图类型保存在案例类“记录”中

  val rdd = sc.parallelize(1 to records.length)
    val recordsIt = records.iterator
      val res: RDD[(String, String, String)] = rdd.flatMap(f => {
        val currItem = recordsIt.next()
        val x: immutable.Iterable[(String, String, String)] = currItem.mapData.map(v => {
          (currItem.identifier, v._1, v._2)
        })
        x
      }).sortBy(r => r)

有没有办法在不遇到严重的并发问题的情况下并行化这项工作(我怀疑正在发生?

示例重复输出

(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)

标签: scalaapache-sparkhadoopapache-spark-sqlrdd

解决方案


Sparkparallelize从一开始就非常高效(因为您已经开始将数据存储在内存中,因此仅在本地迭代它的成本要低得多),但是更惯用的方法是简单的flatMap

sc.parallelize(records.toSeq)
  .flatMapValues(identity)
  .map { case (k1, (k2, v)) => (k1, k2, v) } 

推荐阅读