首页 > 解决方案 > 迭代 RDD 迭代器并应用限制时,Spark 似乎没有调用 hasNext

问题描述

RDD在 Spark 中有一个自定义(使用 Scala),它负责缓冲来自数据库连接的流。

RDD调用 的计算函数时,我建立到数据库的连接,并在我返回的流上返回一个迭代器。

问题是我不确定在哪里关闭与数据库的连接,因为 Spark 似乎没有机制告诉我何时使用迭代器完成。

最初,我将清理代码放在返回的迭代器的“hasNext”中,在返回迭代器的末尾时进行清理。

这样做的问题是,当我执行有限制的查询时,Spark 不会迭代到迭代器的末尾,因此hasNext永远不会在正确的时间调用。

我可以通过以下代码片段确认这一点(这是应用限制时读取SparkPlan::getByteArrayRddmy 的路径):RDD

while (iter.hasNext && (n < 0 || count < n)) {
  val row = iter.next().asInstanceOf[UnsafeRow]
  out.writeInt(row.getSizeInBytes)
  row.writeToStream(out, buffer)
  count += 1
}

Spark 为自定义 RDD 提供了哪些机制来清理其资源?

标签: scalaapache-sparkapache-spark-sqlrdd

解决方案


TaskContext传递给compute函数的 有一个方法addTaskCompletionListener,其文档说:

添加一个(Java 友好的)侦听器以在任务完成时执行。这将在所有情况下调用 - 成功、失败或取消。将侦听器添加到已完成的任务将导致立即调用该侦听器。

HadoopRDD 的一个示例用途是注册回调以关闭输入流。

侦听器抛出的异常将导致任务失败。

在我看来,这正是您应该关闭数据库连接的地方!


推荐阅读