apache-spark - Spark AccumulatorV2 与 HashMap
问题描述
我正在尝试使用哈希映射创建自定义 AccumulatorV2,输入将是哈希映射,输出将是哈希映射的映射,
我的意图是有一个 K -> K1,V,值会增加。我对覆盖 Map 的 AccumulatorV2 的 scala 语法感到困惑,是否有人对此感到幸运。
class CustomAccumulator extends AccumulatorV2[java.util.Map[String, String], java.util.Map[String,java.util.Map[String, Double]]]
解决方案
我假设这是需要实现的场景。
输入: HashMap<String, String>
输出:
应该输出一个 HashMap<String, HashMap<String, Double>>,其中第二个 hashmap 包含与键对应的值的计数。
例子:
输入:(以下 HashMap 被添加到累加器中)
Input HashMap1 -> {"key1", "value1"}, {"key2", "value1"}, {"key3", "value3"}
Input HashMap2 -> {"key1", "value1"}, {"key2", "value1"}
Input HashMap3 -> {"key2", "value1"}
输出:
{"key1", {"value1", 2}}, {"key2", {"value1", 3}}, {"key3", {"value3", 1}}
下面的代码:
import java.util
import java.util.Map.Entry
import java.util.{HashMap, Map}
import java.util.function.{BiFunction, Consumer}
import scala.collection.JavaConversions._
import org.apache.spark.util.AccumulatorV2
import org.datanucleus.store.rdbms.fieldmanager.OldValueParameterSetter
class CustomAccumulator extends AccumulatorV2[Map[String, String], Map[String, Map[String,Double]]] {
private var hashmap : Map[String, Map[String, Double]] = new HashMap[String, Map[String, Double]];
override def isZero: Boolean = {
return hashmap.size() == 0
}
override def copy(): AccumulatorV2[util.Map[String, String], util.Map[String, util.Map[String, Double]]] = {
var customAccumulatorcopy = new CustomAccumulator()
customAccumulatorcopy.merge(this)
return customAccumulatorcopy
}
override def reset(): Unit = {
this.hashmap = new HashMap[String, Map[String, Double]];
}
override def add(v: util.Map[String, String]): Unit = {
v.foreach(kv => {
val unitValueDouble : Double = 1;
if(this.hashmap.containsKey(kv._1)){
val innerMap = this.hashmap.get(kv._1)
innerMap.merge(kv._2, unitValueDouble, addFunction)
}
else {
val innerMap : Map[String, Double] = new HashMap[String, Double]()
innerMap.put(kv._2, unitValueDouble)
this.hashmap.put(kv._1, innerMap)
}
}
)
}
override def merge(otherAccumulator: AccumulatorV2[util.Map[String, String], util.Map[String, util.Map[String, Double]]]): Unit = {
otherAccumulator.value.foreach(kv => {
this.hashmap.merge(kv._1, kv._2, mergeMapsFunction)
})
}
override def value: util.Map[String, util.Map[String, Double]] = {
return this.hashmap
}
val mergeMapsFunction = new BiFunction[Map[String, Double], Map[String, Double], Map[String, Double]] {
override def apply(oldMap: Map[String, Double], newMap: Map[String, Double]): Map[String, Double] = {
newMap.foreach(kv => {
oldMap.merge(kv._1, kv._2, addFunction);
})
oldMap
}
}
val addFunction = new BiFunction[Double, Double, Double] {
override def apply(oldValue: Double, newValue: Double): Double = oldValue + newValue
}
}
谢谢!!!
推荐阅读
- python - ValueError:TensorFlow2 Input 0 与层模型不兼容
- python - 如何更快地制作使用 networkx 的 matplotlib 动画?
- visual-studio-code - VS代码:NoPermissions(FileSystemError):错误:EACCES:权限被拒绝
- python - 从哪里下载 resnet50.h5 文件
- sql - 将运行总和分解为最大组大小/长度
- python - TensorFlow 预测用户的下一个号码
- c# - 无法从 Binance API 的 Spot 账户找到已实现/未实现的利润/损失
- asp.net-core - 设计抽象类的层次结构并在 EF Core 中使用它
- groovy - 如何在 Unirest json 响应中测试空数组值
- python - 在 Python 中强化 XML 注入问题