scala - 为什么我需要使用函数签名扩展我的案例类才能在 Sparks rdd.mapPartition 中工作?
问题描述
我正在开发一个从 Kafka 读取数据的 Spark Streaming 应用程序(Direct API,2.3.2 版),但我想这更多是关于 Scala 本身的问题。
在 foreachRDD 块中,我将一个 RDD 转换为另一个 RDD。转换逻辑在我的案例类中定义
case class ExtractTableInfo(notUsed: Boolean = true)
{
def apply(rdd: Iterator[(String, String)]): Iterator[(String, String))] =
rdd.map { tuple =>
val (key, in) = tuple
Try(JsonUtil.jsonToDescriptor(key)) match {
case Failure(exception) => log.error("...")
case Success(value) => (key, in)
}
}
}
在哪里JsonUtil.jsonToDescriptor
将字符串解析为 JSON。但是,当我申请时
val myExtractTableInfo = ExtractTableInfo(true)
inputDStream.foreachRDD { rdd =>
...
rdd.mapPartitions(myExtractTableInfo)
...
}
然后编译器抱怨:
Error:(71, 22) type mismatch;
只有当我扩展案例类 ExtractTableInfo 时
extends (Iterator[(String, String)] => Iterator[(String, String))])
代码编译并运行良好。
谁能解释这里发生了什么以及为什么需要使用 apply 方法中已经给出的相同签名来扩展案例类?
解决方案
查看Spark API,mapPartitions
期望Function1作为参数。类ExtractTableInfo
根本没有那种特质
extends (Iterator[(String, String)] => Iterator[(String, String))])
这是语法糖
extends Function1[Iterator[(String, String)], Iterator[(String, String)]]
Apply 方法只是简单地覆盖()
了操作符,但它并没有使它成为一个函数。Lambdas 可用于将其转换为函数,即这两种方法都可以在不实现 trait 的情况下工作:
rdd.mapPartitions(myExtractTableInfo(_))
rdd.mapPartitions(it => myExtractTableInfo(it))
换句话说,我们可以看一下值分配:
// won't compile without implementing function trait
val f: Iterator[(String, String)] => Iterator[(String, String)] = myExtractTableInfo
// compiles happily
val f: Iterator[(String, String)] => Iterator[(String, String)] = it => myExtractTableInfo(it)
val f: Iterator[(String, String)] => Iterator[(String, String)] = myExtractTableInfo(_)
// apply is just syntactic sugar for ()
val f: Iterator[(String, String)] => Iterator[(String, String)] = myExtractTableInfo.apply(_)
推荐阅读
- html - 如果他们有超过 x 个孩子,则将 css 应用于父母
- c# - 如何从 Model 类中获取对 HtmlHelper 的引用?
- mysql - 在数据库架构中存储不同类型的文档
- c++ - 根据 C++ 中的输入创建文件并为其命名
- node.js - 如何从节点中的 express 访问 .env 变量?
- r - 如何对相似的字符串进行分组并为 R 中的每个组创建索引变量?
- python - Pandas 数据框列需要作为输入传递给另一个函数
- sql - 如何将函数从熊猫时间重写为sql?
- spring - 使用 spring-boot:run 时 Spring Boot 无法解析属性变量
- leaflet - 如何使用 Bootleaf / esri-leaflet 查询具有多边形层的点?