首页 > 解决方案 > 为什么我需要使用函数签名扩展我的案例类才能在 Sparks rdd.mapPartition 中工作?

问题描述

我正在开发一个从 Kafka 读取数据的 Spark Streaming 应用程序(Direct API,2.3.2 版),但我想这更多是关于 Scala 本身的问题。

在 foreachRDD 块中,我将一个 RDD 转换为另一个 RDD。转换逻辑在我的案例类中定义

case class ExtractTableInfo(notUsed: Boolean = true)
{

  def apply(rdd: Iterator[(String, String)]): Iterator[(String, String))] =
    rdd.map { tuple  =>
      val (key, in) = tuple
      Try(JsonUtil.jsonToDescriptor(key)) match {
        case Failure(exception) => log.error("...")
        case Success(value)     => (key, in)
      }
    }
}

在哪里JsonUtil.jsonToDescriptor将字符串解析为 JSON。但是,当我申请时

val myExtractTableInfo = ExtractTableInfo(true)

inputDStream.foreachRDD { rdd =>
  ...  
  rdd.mapPartitions(myExtractTableInfo)
  ...
}

然后编译器抱怨:

Error:(71, 22) type mismatch;

只有当我扩展案例类 ExtractTableInfo 时

extends (Iterator[(String, String)] => Iterator[(String, String))])

代码编译并运行良好。

谁能解释这里发生了什么以及为什么需要使用 apply 方法中已经给出的相同签名来扩展案例类?

标签: scalaapache-sparkfunctional-programming

解决方案


查看Spark APImapPartitions期望Function1作为参数。类ExtractTableInfo根本没有那种特质

extends (Iterator[(String, String)] => Iterator[(String, String))])

这是语法糖

extends Function1[Iterator[(String, String)], Iterator[(String, String)]]

Apply 方法只是简单地覆盖()了操作符,但它并没有使它成为一个函数。Lambdas 可用于将其转换为函数,即这两种方法都可以在不实现 trait 的情况下工作:

rdd.mapPartitions(myExtractTableInfo(_))
rdd.mapPartitions(it => myExtractTableInfo(it))

换句话说,我们可以看一下值分配:

// won't compile without implementing function trait
val f: Iterator[(String, String)] => Iterator[(String, String)] = myExtractTableInfo
// compiles happily
val f: Iterator[(String, String)] => Iterator[(String, String)] = it => myExtractTableInfo(it)
val f: Iterator[(String, String)] => Iterator[(String, String)] = myExtractTableInfo(_)
// apply is just syntactic sugar for ()
val f: Iterator[(String, String)] => Iterator[(String, String)] = myExtractTableInfo.apply(_)

推荐阅读