首页 > 解决方案 > ERROR: java.lang.IllegalStateException: User did not initialize spark context

问题描述

Scala version: 2.11.12

Spark version: 2.4.0

emr-5.23.0

Get the following when running the below command to create an Amazon EMR cluster

spark-submit --class etl.SparkDataProcessor --master yarn --deploy-mode cluster --conf spark.yarn.appMasterEnv.ETL_NAME=foo --conf spark.yarn.appMasterEnv.ETL_SPARK_MASTER=yarn --conf spark.yarn.appMasterEnv.ETL_AWS_ACCESS_KEY_ID=123 --conf spark.yarn.appMasterEnv.ETL_AWS_SECRET_ACCESS_KEY=abc MY-Tool.jar

Exception

ERROR ApplicationMaster: Uncaught exception: 
java.lang.IllegalStateException: User did not initialize spark context!
    at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:485)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:773)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:772)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:797)
    at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)

How I create my spark session (where sparkMaster = yarn)

lazy val spark: SparkSession = {
    val logger: Logger = Logger.getLogger("etl");
    val sparkAppName = EnvConfig.ETL_NAME
    val sparkMaster = EnvConfig.ETL_SPARK_MASTER

    val sparkInstance = SparkSession
      .builder()
      .appName(sparkAppName)
      .master(sparkMaster)
      .getOrCreate()

    val hadoopConf = sparkInstance.sparkContext.hadoopConfiguration
    hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hadoopConf.set("fs.s3a.access.key", EnvConfig.ETL_AWS_ACCESS_KEY_ID)
    hadoopConf.set("fs.s3a.secret.key", EnvConfig.ETL_AWS_SECRET_ACCESS_KEY)

    logger.info("Created My SparkSession")
    logger.info(s"Spark Application Name: $sparkAppName")
    logger.info(s"Spark Master: $sparkMaster")

    sparkInstance
  }

UPDATE:

I determined that due to the application logic, in certain cases, we did not initialize the spark session. Because of this, it seems that when the cluster terminates, it also tries to do something with the session (perhaps close it) and is thus failing. Now that I have figured out this issue, the application runs but never actually completes. Currently, it seems to be hanging in a particular part involving spark when running in cluster mode:

val data: DataFrame = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(s"s3://$csvPath/$fileKey")
      .toDF()

20/03/16 18:38:35 INFO Client: Application report for application_1584324418613_0031 (state: RUNNING)

标签: scalaapache-sparksbtamazon-emr

解决方案


AFAIK EnvConfig.ETL_AWS_ACCESS_KEY_ID并且ETL_AWS_SECRET_ACCESS_KEY没有被填充,因为 sparksession 不能用 null 或空值实例化。尝试打印和调试这些值。

还从 --conf spark.xxx 读取属性

应该像这个例子。我希望你遵循这个...

spark.sparkContext.getConf.getOption("spark. ETL_AWS_ACCESS_KEY_ID")

一旦你检查了,这个示例方式应该可以工作......

 /**
      * Hadoop-AWS Configuration
      */
    sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.proxy.host", proxyHost)
    sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.proxy.port", proxyPort)
    sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.server-side-encryption-algorithm", "AES256")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.s3n.server-side-encryption-algorithm", "AES256")
    sparkSession.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem

另一件事是,使用

--master yarn或者--master local[*]你可以使用而不是

-conf spark.yarn.appMasterEnv.ETL_SPARK_MASTER=yarn  

更新 :

--conf spark.driver.port=20002可以解决这个问题。其中 20002 是轨道端口.. 似乎它等待特定端口一段时间并且重试了一段时间并且它失败了你得到的例外。

我通过从这里浏览 Sparks 应用程序主代码得到了这个想法

和注释这有点 hacky,但我们需要等到 spark.driver.port 属性由执行用户类的线程设置。

你可以试试这个,让我知道。

进一步阅读:Apache Spark:如何更改 Spark 驱动程序监听的端口


推荐阅读