java - 3 使用 API JAVA 在 Spark SQL 中的 LEFT-JOIN
问题描述
我有来自 3 个表的 3 个数据集:
Dataset<TABLE1> bbdd_one = map.get("TABLE1").as(Encoders.bean(TABLE1.class)).alias("TABLE1");
Dataset<TABLE2> bbdd_two = map.get("TABLE2").as(Encoders.bean(TABLE2.class)).alias("TABLE2");
Dataset<TABLE3> bbdd_three = map.get("TABLE3").as(Encoders.bean(TABLE3.class)).alias("TABLE3");
我想对其进行三重左连接并将其写入输出 .parquet
sql JOIN 语句与此类似:
SELECT one.field, ........, two.field ....., three.field, ... four.field
FROM TABLE1 one
LEFT JOIN TABLE2 two ON two.field = one.field
LEFT JOIN TABLE3 three ON three.field = one.field AND three.field = one.field
LEFT JOIN TABLE3 four ON four.field = one.field AND four.field = one.otherfield
WHERE one.field = 'whatever'
如何使用 JAVA API 做到这一点?是否可以?我做了一个只有一个连接的例子,但 3 似乎很难。
PS:我的另一个加入 JAVA API 是:
Dataset<TJOINED> ds_joined = ds_table1
.join(ds_table2,
JavaConversions.asScalaBuffer(Arrays.asList("fieldInCommon1", "fieldInCommon2", "fieldInCommon3", "fieldInCommon4"))
.seq(),
"inner")
.select("a lot of fields", ... "more fields")
.as(Encoders.bean(TJOINED.class));
谢谢!
解决方案
您是否尝试过链接连接语句?我不经常用 Java 编写代码,所以这只是一个猜测
Dataset<TJOINED> ds_joined = ds_table1
.join(
ds_table2,
JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
"left"
)
.join(
ds_table3,
JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
"left"
)
.join(
ds_table4,
JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
"left"
)
.select(...)
.as(Encoders.bean(TJOINED.class))
更新:如果我的理解是正确的,ds_table3
并且ds_table4
是相同的并且它们加入了不同的领域。那么也许这个更新的答案,它是在 Scala 中给出的,因为它是我习惯使用的,可能会实现你想要的。这是完整的工作示例:
import spark.implicits._
case class TABLE1(f1: Int, f2: Int, f3: Int, f4: Int, f5:Int)
case class TABLE2(f1: Int, f2: Int, vTable2: Int)
case class TABLE3(f3: Int, f4: Int, vTable3: Int)
val one = spark.createDataset[TABLE1](Seq(TABLE1(1,2,3,4,5), TABLE1(1,3,4,5,6)))
//one.show()
//+---+---+---+---+---+
//| f1| f2| f3| f4| f5|
//+---+---+---+---+---+
//| 1| 2| 3| 4| 5|
//| 1| 3| 4| 5| 6|
//+---+---+---+---+---+
val two = spark.createDataset[TABLE2](Seq(TABLE2(1,2,20)))
//two.show()
//+---+---+-------+
//| f1| f2|vTable2|
//+---+---+-------+
//| 1| 2| 20|
//+---+---+-------+
val three = spark.createDataset[TABLE3](Seq(TABLE3(3,4,20), TABLE3(3,5,50)))
//three.show()
//+---+---+-------+
//| f3| f4|vTable3|
//+---+---+-------+
//| 3| 4| 20|
//| 3| 5| 50|
//+---+---+-------+
val result = one
.join(two, Seq("f1", "f2"), "left")
.join(three, Seq("f3", "f4"), "left")
.join(
three.withColumnRenamed("f4", "f5").withColumnRenamed("vTable3", "vTable4"),
Seq("f3", "f5"),
"left"
)
//result.show()
//+---+---+---+---+---+-------+-------+-------+
//| f3| f5| f4| f1| f2|vTable2|vTable3|vTable4|
//+---+---+---+---+---+-------+-------+-------+
//| 3| 5| 4| 1| 2| 20| 20| 50|
//| 4| 6| 5| 1| 3| null| null| null|
//+---+---+---+---+---+-------+-------+-------+
推荐阅读
- c# - ASP.Net Core UWP SignalR 客户端 HubConnectionBuilder.Build 崩溃
- javascript - 如何在没有 Flash 的情况下从 WEB 发布 mediaStream 到 mediaServer
- ios - 我们如何把上面的文字放在上面?UIL标签
- metrics - 列出 Report Suite Adobe Analytics 的所有维度和指标
- java - 如何在单击列表中选择的歌曲然后播放歌曲时将活动更改为现在播放类型
- optaplanner - 访问约束流内计划集合的值范围
- c# - 如何在 Visual Studio Code 中打开 C#“建议消息”?
- javascript - 如何检查二维数组是否连续三个相同的值
- python - 计算二进制熊猫数据帧的相似性度量
- python - 为什么我的 AWS lambda 函数测试给了我 SSL 错误?