首页 > 解决方案 > 如何确保我的 Apache Spark 设置代码只运行一次?

问题描述

我正在 Scala 中编写一个 Spark 作业,它读取 S3 上的 parquet 文件,进行一些简单的转换,然后将它们保存到 DynamoDB 实例。每次运行时,我们都需要在 Dynamo 中创建一个新表,因此我编写了一个负责创建表的 Lambda 函数。我的 Spark 作业做的第一件事是生成一个表名,调用我的 Lambda 函数(将新表名传递给它),等待创建表,然后正常进行 ETL 步骤。

但是,看起来我的 Lambda 函数一直被调用两次。我无法解释。这是代码示例:

def main(spark: SparkSession, pathToParquet: String) {

  // generate a unique table name
  val tableName = generateTableName()

  // call the lambda function
  val result = callLambdaFunction(tableName)

  // wait for the table to be created
  waitForTableCreation(tableName)

  // normal ETL pipeline
  var parquetRDD = spark.read.parquet(pathToParquet)
  val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])
  transformedRDD.saveAsHadoopDataset(getConfiguration(tableName))
  spark.sparkContext.stop()
}

等待表创建的代码非常简单,如您所见:

def waitForTableCreation(tableName: String) {
  val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
  val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
  try {
    waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
  } catch {
      case ex: WaiterTimedOutException =>
        LOGGER.error("Timed out waiting to create table: " + tableName)
        throw ex
      case t: Throwable => throw t
  }
}

lambda 调用同样简单:

def callLambdaFunction(tableName: String) {
  val myLambda = LambdaInvokerFactory.builder()
    .lambdaClient(AWSLambdaClientBuilder.defaultClient)
    .lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
    .build(classOf[MyLambdaContract])
  myLambda.invoke(new MyLambdaInput(tableName))
}

就像我说的,当我spark-submit在这段代码上运行时,它肯定会命中 Lambda 函数。但我无法解释为什么它会击中两次。结果是我在 DynamoDB 中预置了两个表。

在将其作为 Spark 作业运行的上下文中,等待步骤似乎也失败了。但是当我对等待的代码进行单元测试时,它似乎可以自己正常工作。它成功阻塞,直到表准备好。

起初,我推测可能spark-submit是将此代码发送到所有工作节点,并且它们独立运行整个事情。最初我有一个 Spark 集群,有 1 个 master 和 2 个 worker。然而,我在另一个有 1 个 master 和 5 个 worker 的集群上对此进行了测试,它再次准确地命中了 Lambda 函数两次,然后显然没有等待表创建,因为它在调用 Lambda 后不久就死了。

有人对 Spark 可能在做什么有任何线索吗?我错过了一些明显的东西吗?

更新:这是我的 spark-submit 参数,在 EMR 的“步骤”选项卡上可见。

spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3://my-bucket/my-spark-job.jar

这是我的getConfiguration功能的代码:

def getConfiguration(tableName: String) : JobConf = {
  val conf = new Configuration()
  conf.set("dynamodb.servicename", "dynamodb")
  conf.set("dynamodb.input.tableName", tableName)
  conf.set("dynamodb.output.tableName", tableName)
  conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
  conf.set("dynamodb.regionid", "us-east-1")
  conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
  new JobConf(conf)
}

这里还有一个 Gist,其中包含我在尝试运行它时看到的一些异常日志。

标签: scalaapache-spark

解决方案


感谢 @soapergem 添加日志记录和选项。我添加了一个答案(试一试),因为它可能比评论长一点:)

总结:

对于你的最后一个问题:

如果我在“客户端”部署模式而不是“集群”部署模式下运行它,我的代码看起来可能会工作?这对这里的任何人有什么提示吗?

有关差异的更多信息,请查看https://community.hortonworks.com/questions/89263/difference-between-local-vs-yarn-cluster-vs-yarn-c.html在您的情况下,它看起来像以客户端模式执行spark-submit的机器具有与 EMR 作业流不同的 IAM 策略。我在这里的假设是您的工作流程角色是不允许的dynamodb:Describe*,这就是为什么您500 code(从您的要点)得到例外:

Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)

为了证实这个假设,你执行你的部分创建表并在本地等待创建(这里没有 Spark 代码,只是java你的 main 函数的一个简单命令)并且:

  • 对于第一次执行,请确保您拥有所有权限。IMO 它将dynamodb:Describe*开启Resources: *(如果是这个原因,AFAIK 你应该Resources: Test_Emr*在生产中使用 somthing 以实现最低特权原则)
  • 对于第二次执行删除dynamodb:Describe*并检查您是否获得与要点中相同的堆栈跟踪

推荐阅读