首页 > 解决方案 > scala - 使用地图列表传播rdd

问题描述

我想使用列表地图传播 rdd。

输入样本是

Log("key1", "key2", "key3", Map(tk1 -> tv1, tk2 -> tv2, tk3 -> tv3))

我想要的输出样本是

RDD[(String, String, String, String, String)]
("key1", "key2", "key3", "tk1", "tv1")
("key1", "key2", "key3", "tk2", "tv2")
("key1", "key2", "key3", "tk3", "tv3")

最后,我想做如下图的reduce操作。但它不起作用。

val mapCnt = logs.map(log => {
  log.textMap.foreach { tmap =>
    var tkey = tmap._1
    var tvalue = tmap._2
  }
  ((log.key1, log.key2, log.key3, tkey, tvalue), 1L)
}).reduceByKey(_ + _)

这是我使用的输入对象。

case class Log(
            val key1: String,
            val key2: String,
            val key3: String,
            val TextMap: Map[String, String]
          ) 

我该如何改变这个?

谢谢您的帮助。

标签: scalaapache-sparkforeachrdd

解决方案


您计算结果foreach并立即丢弃它们。此外,这些值超出了范围。最好在flatMap这里使用。

val mapCnt = logs.flatMap(log => {
  for { 
    (tkey, tvalue) <- tmap
  } yield ((log.key1, log.key2, log.key3, tkey, tvalue), 1L)
}).reduceByKey(_ + _)

推荐阅读