首页 > 解决方案 > Spark AccumulatorV2 与 HashMap

问题描述

我正在尝试使用哈希映射创建自定义 AccumulatorV2,输入将是哈希映射,输出将是哈希映射的映射,

我的意图是有一个 K -> K1,V,值会增加。我对覆盖 Map 的 AccumulatorV2 的 scala 语法感到困惑,是否有人对此感到幸运。

class CustomAccumulator extends AccumulatorV2[java.util.Map[String, String], java.util.Map[String,java.util.Map[String, Double]]]

标签: apache-spark

解决方案


我假设这是需要实现的场景。

输入: HashMap<String, String>

输出:

应该输出一个 HashMap<String, HashMap<String, Double>>,其中第二个 hashmap 包含与键对应的值的计数。

例子:

输入:(以下 HashMap 被添加到累加器中)

Input HashMap1 -> {"key1", "value1"}, {"key2", "value1"}, {"key3", "value3"}
Input HashMap2 -> {"key1", "value1"}, {"key2", "value1"}
Input HashMap3 -> {"key2", "value1"}

输出:

{"key1", {"value1", 2}}, {"key2", {"value1", 3}}, {"key3", {"value3", 1}}

下面的代码:

import java.util
import java.util.Map.Entry
import java.util.{HashMap, Map}
import java.util.function.{BiFunction, Consumer}

import scala.collection.JavaConversions._
import org.apache.spark.util.AccumulatorV2
import org.datanucleus.store.rdbms.fieldmanager.OldValueParameterSetter

class CustomAccumulator extends AccumulatorV2[Map[String, String], Map[String, Map[String,Double]]] {

  private var hashmap : Map[String, Map[String, Double]] = new HashMap[String, Map[String, Double]];

  override def isZero: Boolean = {
      return hashmap.size() == 0
  }

  override def copy(): AccumulatorV2[util.Map[String, String], util.Map[String, util.Map[String, Double]]] = {
    var customAccumulatorcopy = new CustomAccumulator()
    customAccumulatorcopy.merge(this)
    return customAccumulatorcopy
  }

  override def reset(): Unit = {
    this.hashmap = new HashMap[String, Map[String, Double]];
  }

  override def add(v: util.Map[String, String]): Unit = {
    v.foreach(kv => {
    val unitValueDouble : Double = 1;
      if(this.hashmap.containsKey(kv._1)){
        val innerMap = this.hashmap.get(kv._1)
        innerMap.merge(kv._2, unitValueDouble, addFunction)
      }
      else {
        val innerMap : Map[String, Double] = new HashMap[String, Double]()
        innerMap.put(kv._2, unitValueDouble)
        this.hashmap.put(kv._1, innerMap)
      }
    }
    )
  }

  override def merge(otherAccumulator: AccumulatorV2[util.Map[String, String], util.Map[String, util.Map[String, Double]]]): Unit = {
    otherAccumulator.value.foreach(kv => {
      this.hashmap.merge(kv._1, kv._2, mergeMapsFunction)
    })
  }

  override def value: util.Map[String, util.Map[String, Double]] = {
    return this.hashmap
  }

  val mergeMapsFunction = new BiFunction[Map[String, Double], Map[String, Double], Map[String, Double]] {
    override def apply(oldMap: Map[String, Double], newMap: Map[String, Double]): Map[String, Double] = {
      newMap.foreach(kv => {
        oldMap.merge(kv._1, kv._2, addFunction);
      })
      oldMap
    }
  }

  val addFunction =  new BiFunction[Double, Double, Double] {
    override def apply(oldValue: Double, newValue: Double): Double = oldValue + newValue
  }
}

谢谢!!!


推荐阅读