scala - 迭代 RDD 迭代器并应用限制时,Spark 似乎没有调用 hasNext
问题描述
我RDD
在 Spark 中有一个自定义(使用 Scala),它负责缓冲来自数据库连接的流。
当RDD
调用 的计算函数时,我建立到数据库的连接,并在我返回的流上返回一个迭代器。
问题是我不确定在哪里关闭与数据库的连接,因为 Spark 似乎没有机制告诉我何时使用迭代器完成。
最初,我将清理代码放在返回的迭代器的“hasNext”中,在返回迭代器的末尾时进行清理。
这样做的问题是,当我执行有限制的查询时,Spark 不会迭代到迭代器的末尾,因此hasNext
永远不会在正确的时间调用。
我可以通过以下代码片段确认这一点(这是应用限制时读取SparkPlan::getByteArrayRdd
my 的路径):RDD
while (iter.hasNext && (n < 0 || count < n)) {
val row = iter.next().asInstanceOf[UnsafeRow]
out.writeInt(row.getSizeInBytes)
row.writeToStream(out, buffer)
count += 1
}
Spark 为自定义 RDD 提供了哪些机制来清理其资源?
解决方案
TaskContext
传递给compute
函数的 有一个方法addTaskCompletionListener
,其文档说:
添加一个(Java 友好的)侦听器以在任务完成时执行。这将在所有情况下调用 - 成功、失败或取消。将侦听器添加到已完成的任务将导致立即调用该侦听器。
HadoopRDD 的一个示例用途是注册回调以关闭输入流。
侦听器抛出的异常将导致任务失败。
在我看来,这正是您应该关闭数据库连接的地方!
推荐阅读
- ios - 同步排队 异步操作
- datetime - 获取过去 2 个月和当前月份的数据
- ruby-on-rails - 在 Rails 5 中使用 smarter_csv 引用列号
- android - 带有 com.android.future.usb 的 Android Pie 模拟器
- rest - 使用路径变量测试 Chi 路线
- java - 错误 java.lang.ArrayIndexOutOfBoundsException:2
- c++ - C++ 将字节从 char* 传递到 BYTE*
- javascript - 使用ajax发送阿拉伯语的问题
- hadoop - hadoop集群中的物理内存是什么?
- wordpress - 如何在代码编辑器中使用 WordPress 短代码?