首页 > 解决方案 > 3 使用 API JAVA 在 Spark SQL 中的 LEFT-JOIN

问题描述

我有来自 3 个表的 3 个数据集:

Dataset<TABLE1> bbdd_one = map.get("TABLE1").as(Encoders.bean(TABLE1.class)).alias("TABLE1");
Dataset<TABLE2> bbdd_two = map.get("TABLE2").as(Encoders.bean(TABLE2.class)).alias("TABLE2");
Dataset<TABLE3> bbdd_three = map.get("TABLE3").as(Encoders.bean(TABLE3.class)).alias("TABLE3");

我想对其进行三重左连接并将其写入输出 .parquet

sql JOIN 语句与此类似:

SELECT one.field, ........, two.field ....., three.field, ... four.field
FROM TABLE1 one
LEFT JOIN TABLE2 two ON two.field = one.field
LEFT JOIN TABLE3 three ON three.field = one.field AND three.field = one.field
LEFT JOIN TABLE3 four ON four.field = one.field AND four.field = one.otherfield
WHERE one.field = 'whatever'

如何使用 JAVA API 做到这一点?是否可以?我做了一个只有一个连接的例子,但 3 似乎很难。

PS:我的另一个加入 JAVA API 是:

Dataset<TJOINED> ds_joined = ds_table1
                        .join(ds_table2,
                                JavaConversions.asScalaBuffer(Arrays.asList("fieldInCommon1", "fieldInCommon2", "fieldInCommon3", "fieldInCommon4"))
                                        .seq(),
                                "inner")
                        .select("a lot of fields", ... "more fields")                                                               
                        .as(Encoders.bean(TJOINED.class));

谢谢!

标签: javaapache-sparkjoinapache-spark-sqlleft-join

解决方案


您是否尝试过链接连接语句?我不经常用 Java 编写代码,所以这只是一个猜测

Dataset<TJOINED> ds_joined = ds_table1
    .join(
        ds_table2,
        JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
        "left"
    )
    .join(
        ds_table3,
        JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
        "left"
    )
    .join(
        ds_table4,
        JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
        "left"
    )
    .select(...)
    .as(Encoders.bean(TJOINED.class))

更新:如果我的理解是正确的,ds_table3并且ds_table4是相同的并且它们加入了不同的领域。那么也许这个更新的答案,它是在 Scala 中给出的,因为它是我习惯使用的,可能会实现你想要的。这是完整的工作示例:

import spark.implicits._

case class TABLE1(f1: Int, f2: Int, f3: Int, f4: Int, f5:Int)
case class TABLE2(f1: Int, f2: Int, vTable2: Int)
case class TABLE3(f3: Int, f4: Int, vTable3: Int)

val one = spark.createDataset[TABLE1](Seq(TABLE1(1,2,3,4,5), TABLE1(1,3,4,5,6)))
//one.show()
//+---+---+---+---+---+
//| f1| f2| f3| f4| f5|
//+---+---+---+---+---+
//|  1|  2|  3|  4|  5|
//|  1|  3|  4|  5|  6|
//+---+---+---+---+---+

val two = spark.createDataset[TABLE2](Seq(TABLE2(1,2,20)))
//two.show()
//+---+---+-------+
//| f1| f2|vTable2|
//+---+---+-------+
//|  1|  2|     20|
//+---+---+-------+

val three = spark.createDataset[TABLE3](Seq(TABLE3(3,4,20), TABLE3(3,5,50)))
//three.show()
//+---+---+-------+
//| f3| f4|vTable3|
//+---+---+-------+
//|  3|  4|     20|
//|  3|  5|     50|
//+---+---+-------+

val result = one
.join(two, Seq("f1", "f2"), "left")
.join(three, Seq("f3", "f4"), "left")
.join(
  three.withColumnRenamed("f4", "f5").withColumnRenamed("vTable3", "vTable4"),
  Seq("f3", "f5"),
  "left"
)
//result.show()
//+---+---+---+---+---+-------+-------+-------+
//| f3| f5| f4| f1| f2|vTable2|vTable3|vTable4|
//+---+---+---+---+---+-------+-------+-------+
//|  3|  5|  4|  1|  2|     20|     20|     50|
//|  4|  6|  5|  1|  3|   null|   null|   null|
//+---+---+---+---+---+-------+-------+-------+

推荐阅读