首页 > 解决方案 > 如何在 Spark UDAF 中实现 fastutils 映射?

问题描述

我正在构建一个 Spark UDAF,我将中间数据存储在一个 fastutils 映射中。架构如下所示:

def bufferSchema = new StructType().add("my_map_col", MapType(StringType, IntegerType))

我初始化没有问题:

def initialize(buffer: MutableAggregationBuffer) = {
   buffer(0) = new Object2IntOpenHashMap[String]()
}

当我尝试更新时出现问题:

def update(buffer: MutableAggregationBuffer, input: Row) = { 
  val myMap = buffer.getAs[Object2IntOpenHashMap[String]](0)
  myMap.put(input.getAs[String](0), 1)
  buffer(0) = myMap
}

收到以下错误:

Caused by: java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap

有什么办法可以使这项工作?

标签: scalaapache-sparkfastutil

解决方案


有什么办法可以使这项工作?

并不真地。这个

buffer.getAs[Object2IntOpenHashMap[String]](0)

相当于

buffer.get(0).asInstanceOf[Object2IntOpenHashMap[String]]]

外部MapType类型scala.collection.Map为.

在实践中,无论如何它都是一个死胡同——UserDefinedAggregate函数在每次调用时都会对数据进行完整的复制。你可能会有更好的运气Aggregator(如链接的问题)。


推荐阅读