java - Flink Scala - 比较方法违反了它的一般合同
问题描述
我正在 Flink 中编写一个项目,该项目涉及在批处理数据上流式传输一组查询点并执行完整的顺序扫描以查找最近的邻居。对单个 Float 值进行简单排序操作会引发违反一般合约错误。主要方法定义为:
object StreamingDeCP{
var points: Vector[Point] = _
def main(args: Array[String]): Unit = {
val queryPointsVec: Vector[Point] = ... // Read from file
val pointsVec: Vector[Point] = ... // Read from file
val streamEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val queryPoints = streamEnv.fromCollection(queryPointsVec)
points = pointsVec
queryPoints.map(new StreamingSequentialScan)
streamEnv.execute("StreamingDeCP")
}
final class StreamingSequentialScan
extends MapFunction[Point, (Point, Vector[Point])] {
def map(queryPoint: Point): (Point, Vector[Point]) = {
val nn = points
.map{ _.eucDist(queryPoint) }
.sorted
(queryPoint, nn)
}
}
}
类Point
和伴随对象是:
case class Point(pointID: Long,
descriptor: Vector[Float]) extends Serializable {
var distance: Float = Float.MaxValue
def eucDist(that: Point): Point = {
// Simple arithmetic to calculate and set the distance variable
}
}
object Point{
implicit def orderByDistance[A <: Point]: Ordering[A] =
Ordering.by(_.distance)
}
为了查明原因,这里有一些关于我尝试过的事情的注释:
- 断言所有
distance
值都在 Float.MaxValue 和 Float.MinValue 之间,并且不存在负零值 - 断言在同一个排序操作中没有重复
distance
的变量(我的用例允许这样做,但我想我会检查它以防万一) - 将浮点数转换为整数值并改为对这些值进行排序
- 添加了显式排序
Point
而不是使用隐式 - 对唯一
pointID
而不是排序distance
,这有效,但对于这个问题的上下文是无用的。
我还注意到,执行相同的代码并不总是可靠地重现错误。我正在Vector[Points]
以完全确定的方式阅读,因此导致这种行为的唯一可能原因必须是 Flink 调度程序或排序方法中的一些有状态计算。
关于同一主题的其他帖子似乎涉及自定义比较器中的错过场景,但这应该是对单个浮点值的简单排序操作,所以我不知道是什么导致了这个问题。
解决方案
我不熟悉 Flink,但我没有任何理由假设它会以顺序单线程的方式执行每一个令人尴尬的并行任务。 MapFunction
由于您的Point
contains var
s,并且这些s 在 s 的方法中var
发生了变异,因此代码必须以“比较方法违反其一般合同”而失败 -每当使用 parallelism 执行时都会出现异常。map
MapFunction
MapFunction
!= 1
为了避免函数内部的任何副作用map
,您可以修改代码如下:
- 从 中删除任何
var
smain
,使其points
成为不可变的val
。 - 删除任何类型的
var
sPoint
实现方法
def eucDist(other: Point): Double
这只是计算到另一个点的距离(不改变任何东西)。
使用
sortBy
:val nn = points.sortBy(_.eucDist(queryPoint))
或者,如果您想避免在排序期间多次重新计算欧几里得距离,请预先计算一次距离,排序,然后丢弃距离:
val nn = points.map(p => (p, p.eucDist(queryPoint))).sortBy(_._2).map(_._1)
推荐阅读
- python - \在python中做了什么?
- c# - C#/XML/Parsing/比较不同程序保存的文件位置,一种在uri中使用localhost
- tensorflow - 训练SSD-MOBILENET V1,损失不减
- excel - read_excel 在 index_col=none 时使用 pandas 创建索引
- python-3.x - 如何使用 mapbox 创建一个等值线地图并在国家一级进行绘图?
- swift - 快速解码对象数组
- python - 循环时将数据框的每一列保存在新的数据框中
- javascript - 如何使用 styled-components 更改其他组件中单个组件的样式?
- javascript - 结果低于输入
- javascript - 仅从 Web 应用程序和 localhost 访问 s3 文件