scala - 不支持没有相等谓词的流流连接
问题描述
我正在使用 Spark 2.3 并试图加入两个数据流。我的左右流都有一个数组。只有当右流数组是左流数组的子集时,我才想加入这两个流。
例如,我的 streamA 看起来像这样:
StreamA:
|---|------|---------------------|-----------|
|id | dept | employeesInMeetings | DateTime |
|---|------|---------------------|-----------|
| 1 | sales| [John] | 7/2 14:00 |
| 2 | mktg | [Adam, Mike] | 7/2 12:30 |
| 3 | hr | [Rick, Jill, Andy] | 7/2 14:00 |
|---|------|---------------------|-----------|
我的 streamB 如下所示:
StreamB:
|--------------|--------------|----------|
|employees | confRooms | DateTime |
|--------------|--------------|----------|
| [John, Jane] | A | 7/2 14:00|
| [Adam, Mike] | C | 7/2 12:30|
| [Jill, Andy] | B | 7/2 14:00|
|--------------|--------------|----------|
我只关心在同一个会议中来自同一部门的员工。因此,作为交集的结果,我的结果流需要如下所示:
|---|------|---------------------|-----------|----------|
|id | dept | employeesInMeetings | DateTime | confRoom |
|---|------|---------------------|-----------|----------|
| 2 | mktg | [Adam, Mike] | 7/2 12:30 | C |
| 3 | hr | [Rick, Jill, Andy] | 7/2 14:00 | B |
|---|------|---------------------|-----------|----------|
我创建了一个 UDF 来进行相交:
val arrayIntersect = udf((leftArr: Array[String], rightArr: Array[String]) => {
import spark.implicits._
if(leftArr.intersect(rightArr.toSeq).length == rightArr.size){
true
} else {
false
}
})
并尝试按如下方式使用它:
streamA.joinWith(streamB, expr("arrayIntersect(leftArr, rightArr) AND streamA.DateTime BETWEEN streamB.DateTime and streamB.DateTime + INTERVAL 12 hours"))
但是,我收到错误:
org.apache.spark.sql.AnalysisException: Stream stream joins without equality predicate is not supported;
有人知道这里是否有解决方法吗?任何帮助将不胜感激!谢谢!
解决方案
不幸的是,在流连接中没有解决方法:(
我们确实需要一个相等谓词,因为我们使用它来使用流对称哈希连接算法执行连接——两个流都使用公共密钥进行分区,以便来自两个流的相关记录最终在同一个分区中。
推荐阅读
- google-sheets - 如何使用条件格式在 Google 表格的 1 列中突出显示部分匹配的重复项
- html - 在有两个标题的表格中,主标题不会在滚动时保持粘性
- python - 如何改变盒子的不透明度(cv2.rectangle)?
- java - 如何修复:java.lang.ClassCastException:androidx.appcompat.widget.AppCompatTextView 无法转换为 android.widget.Button
- python-3.x - 循环遍历 csv 数据
- ios - defer 语句的不同行为
- node.js - 如何使用 ckeditor 在 mongoose-node 应用程序中上传图像。文件将上传到 s3
- qt - 使用 QMediaplayer 从缓冲区播放视频并在视频播放时附加缓冲区
- c# - 从字符串中删除 HTML -- 注释
- ios - 我可以使用 Siri Shortcut 更改变量的值吗?