scala - 如何在 Spark UDAF 中实现 fastutils 映射?
问题描述
我正在构建一个 Spark UDAF,我将中间数据存储在一个 fastutils 映射中。架构如下所示:
def bufferSchema = new StructType().add("my_map_col", MapType(StringType, IntegerType))
我初始化没有问题:
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = new Object2IntOpenHashMap[String]()
}
当我尝试更新时出现问题:
def update(buffer: MutableAggregationBuffer, input: Row) = {
val myMap = buffer.getAs[Object2IntOpenHashMap[String]](0)
myMap.put(input.getAs[String](0), 1)
buffer(0) = myMap
}
收到以下错误:
Caused by: java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap
有什么办法可以使这项工作?
解决方案
有什么办法可以使这项工作?
并不真地。这个
buffer.getAs[Object2IntOpenHashMap[String]](0)
相当于
buffer.get(0).asInstanceOf[Object2IntOpenHashMap[String]]]
外部MapType
类型scala.collection.Map
为.
在实践中,无论如何它都是一个死胡同——UserDefinedAggregate
函数在每次调用时都会对数据进行完整的复制。你可能会有更好的运气Aggregator
(如链接的问题)。
推荐阅读
- python - Pandas:结合实际值和预测值
- snakemake - Snakemake 在规则中使用相同的输入和输出
- python - 使用多文件夹和文件和 TensorFlow 部署 exe Kivy
- mysql - 带有 MySQL 和 Maria DB 提交和回滚问题的 Spring Boot JDBCTemplate
- java - 如何确保 Java Web Start 应用程序的单个实例正在客户端计算机上运行?
- javascript - 递归 axios 调用
- python - pandas:在组内使用条件进行迭代
- c# - 如何在 ASP.NET Core 重写模块中使用 {PATH_INFO}?
- android - Angular/Cordova 应用程序 html5 视频标签无法在 Android 上播放
- dependencies - 如何在理解中一次创建多个蝴蝶图?