首页 > 解决方案 > 多个测试类的 MiniDFS 集群设置抛出 java.net.BindException: Address already in use

问题描述

我正在为从 hdfs 文件和 spark 目录读取/写入数据的 spark 代码编写单元测试用例。为此,我创建了一个单独的特征来提供 minidfs 集群的初始化,并且我在spark.sql.warehouse.dir创建 SparkSession 对象时使用生成的 hdfs uri 值。这是它的代码 -

trait TestSparkSession extends BeforeAndAfterAll {
  self: Suite =>

  var hdfsCluster: MiniDFSCluster = _

  def nameNodeURI: String = s"hdfs://localhost:${hdfsCluster.getNameNodePort}/"

  def withLocalSparkSession(tests: SparkSession => Any): Any = {
    val baseDir = new File(PathUtils.getTestDir(getClass), "miniHDFS")
    val conf = new HdfsConfiguration()
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
    val builder = new MiniDFSCluster.Builder(conf)
    hdfsCluster = builder.nameNodePort(9000)
      .manageNameDfsDirs(true)
      .manageDataDfsDirs(true)
      .format(true)
      .build()
    hdfsCluster.waitClusterUp()

    val testSpark = SparkSession
      .builder()
      .master("local")
      .appName("Test App")
      .config("spark.sql.warehouse.dir", s"${nameNodeURI}spark-warehouse/")
      .getOrCreate()
    tests(testSpark)  
  }

  def stopHdfs(): Unit = hdfsCluster.shutdown(true, true)

  override def afterAll(): Unit = stopHdfs()

}

在编写测试时,我继承了这个特性,然后编写测试用例,例如 -

class SampleSpec extends FunSuite with TestSparkSession {
     withLocalSparkSession {
         testSpark =>
         import testSpark.implicits._

         // Test 1 Here
         // Test 2 Here
     }
}

当我一次运行一个测试类时,一切正常。但是当一次运行它们时,它会抛出java.net.BindException: Address already in use. 这应该意味着在执行下一组测试时,已经创建的 hdfsCluster 尚未关闭。这就是它无法创建另一个绑定到同一端口的原因。但后来在 afterAll() 中我停止了 hfdsCluster。

我的问题是我可以共享 hdfs 集群和 spark 会话的单个实例,而不是每次都初始化它吗?我试图在方法之外提取初始化,但它仍然抛出相同的异常。即使我不能共享它,我怎样才能正确停止我的集群并在下一次测试类执行时重新初始化它?

另外,请让我知道我编写使用 SparkSession 和 HDFS 存储的“单元”测试用例的方法是否正确。

任何帮助将不胜感激。

标签: scalaapache-sparkapache-spark-sqlhdfsscalatest

解决方案


我通过在伴生对象中创建 hdfs 集群来解决它,以便它为所有测试套装创建它的单个实例。


推荐阅读