首页 > 解决方案 > 类型安全连接与 Spark 数据集的安全性比我预期的要低

问题描述

在尝试使用此解决方案Scala 中使用 Spark 数据集作为隐式执行类型化连接时,我遇到了一些我不理解的东西。

在下面的测试中,签名innerJoinis def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]),但我用f: Foo => Stringand来称呼它g: Bar => Int。我希望在编译时出现错误,但它编译得很好。这是为什么?

实际发生的是它编译得很好,并且java.lang.ClassNotFoundException: scala.Any当 Spark 尝试创建产品编码器时测试失败(((K, Foo),(K, Bar))我认为对于生成的元组)。我假设Any出现 和 的共同“父母IntString

import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
import org.scalatest.Matchers
import org.scalatest.testng.TestNGSuite
import org.testng.annotations.Test

case class Foo(a: String)

case class Bar(b: Int)

class JoinTest extends TestNGSuite with Matchers {
  import JoinTest._

  @Test
  def testJoin(): Unit = {
    val spark = SparkSession.builder()
      .master("local")
      .appName("test").getOrCreate()

    import spark.implicits._

    val ds1 = spark.createDataset(Seq(Foo("a")))
    val ds2 = spark.createDataset(Seq(Bar(123)))

    val jd = ds1.innerJoin(ds2)(_.a, _.b)

    jd.count shouldBe 0
  }
 }

object JoinTest {
  implicit class Joins[T](ds1: Dataset[T]) {
    def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)
     (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] = 
     {
       val ds1_ = ds1.map(x => (f(x), x))
       val ds2_ = ds2.map(x => (g(x), x))
       ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
    }
   }
}

标签: scalaapache-spark

解决方案


你是正确的,Any被推断为Stringand的共同父级Int,因此被用作K. Function在输出类型中是协变的。所以 aFoo => String是 的有效子类Foo => Any

解决这种问题的常用方法是使用两个类型参数和一个隐式=:=. 例如:

def innerJoin[U, K1, K2](ds2: Dataset[U])(f: T => K1, g: U => K2)
  (implicit eq: K1 =:= K2, e1: Encoder[(K2, T)], e2: Encoder[(K2, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] = 
  {
    val ds1_ = ds1.map(x => (eq(f(x)), x))
    ... rest the same as before ...

推荐阅读