scala - 如何确保我的 Apache Spark 设置代码只运行一次?
问题描述
我正在 Scala 中编写一个 Spark 作业,它读取 S3 上的 parquet 文件,进行一些简单的转换,然后将它们保存到 DynamoDB 实例。每次运行时,我们都需要在 Dynamo 中创建一个新表,因此我编写了一个负责创建表的 Lambda 函数。我的 Spark 作业做的第一件事是生成一个表名,调用我的 Lambda 函数(将新表名传递给它),等待创建表,然后正常进行 ETL 步骤。
但是,看起来我的 Lambda 函数一直被调用两次。我无法解释。这是代码示例:
def main(spark: SparkSession, pathToParquet: String) {
// generate a unique table name
val tableName = generateTableName()
// call the lambda function
val result = callLambdaFunction(tableName)
// wait for the table to be created
waitForTableCreation(tableName)
// normal ETL pipeline
var parquetRDD = spark.read.parquet(pathToParquet)
val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])
transformedRDD.saveAsHadoopDataset(getConfiguration(tableName))
spark.sparkContext.stop()
}
等待表创建的代码非常简单,如您所见:
def waitForTableCreation(tableName: String) {
val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
try {
waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
} catch {
case ex: WaiterTimedOutException =>
LOGGER.error("Timed out waiting to create table: " + tableName)
throw ex
case t: Throwable => throw t
}
}
lambda 调用同样简单:
def callLambdaFunction(tableName: String) {
val myLambda = LambdaInvokerFactory.builder()
.lambdaClient(AWSLambdaClientBuilder.defaultClient)
.lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
.build(classOf[MyLambdaContract])
myLambda.invoke(new MyLambdaInput(tableName))
}
就像我说的,当我spark-submit
在这段代码上运行时,它肯定会命中 Lambda 函数。但我无法解释为什么它会击中两次。结果是我在 DynamoDB 中预置了两个表。
在将其作为 Spark 作业运行的上下文中,等待步骤似乎也失败了。但是当我对等待的代码进行单元测试时,它似乎可以自己正常工作。它成功阻塞,直到表准备好。
起初,我推测可能spark-submit
是将此代码发送到所有工作节点,并且它们独立运行整个事情。最初我有一个 Spark 集群,有 1 个 master 和 2 个 worker。然而,我在另一个有 1 个 master 和 5 个 worker 的集群上对此进行了测试,它再次准确地命中了 Lambda 函数两次,然后显然没有等待表创建,因为它在调用 Lambda 后不久就死了。
有人对 Spark 可能在做什么有任何线索吗?我错过了一些明显的东西吗?
更新:这是我的 spark-submit 参数,在 EMR 的“步骤”选项卡上可见。
spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3://my-bucket/my-spark-job.jar
这是我的getConfiguration
功能的代码:
def getConfiguration(tableName: String) : JobConf = {
val conf = new Configuration()
conf.set("dynamodb.servicename", "dynamodb")
conf.set("dynamodb.input.tableName", tableName)
conf.set("dynamodb.output.tableName", tableName)
conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
conf.set("dynamodb.regionid", "us-east-1")
conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
new JobConf(conf)
}
这里还有一个 Gist,其中包含我在尝试运行它时看到的一些异常日志。
解决方案
感谢 @soapergem 添加日志记录和选项。我添加了一个答案(试一试),因为它可能比评论长一点:)
总结:
spark-submit
和配置选项没什么奇怪的- 在https://gist.github.com/soapergem/6b379b5a9092dcd43777bdec8dee65a8#file-stderr-log中,您可以看到应用程序执行了两次。它从 ACCEPTED 到 RUNNING 状态经过两次。这与 EMR 默认值一致(如何防止 EMR Spark 步骤重试?)。为了确认这一点,您可以检查在执行该步骤后是否创建了 2 个表(我假设您正在生成具有动态名称的表;每次执行时使用不同的名称,在重试的情况下应该给出 2 个不同的名称)
对于你的最后一个问题:
如果我在“客户端”部署模式而不是“集群”部署模式下运行它,我的代码看起来可能会工作?这对这里的任何人有什么提示吗?
有关差异的更多信息,请查看https://community.hortonworks.com/questions/89263/difference-between-local-vs-yarn-cluster-vs-yarn-c.html在您的情况下,它看起来像以客户端模式执行spark-submit
的机器具有与 EMR 作业流不同的 IAM 策略。我在这里的假设是您的工作流程角色是不允许的dynamodb:Describe*
,这就是为什么您500 code
(从您的要点)得到例外:
Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857)
at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129)
at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)
为了证实这个假设,你执行你的部分创建表并在本地等待创建(这里没有 Spark 代码,只是java
你的 main 函数的一个简单命令)并且:
- 对于第一次执行,请确保您拥有所有权限。IMO 它将
dynamodb:Describe*
开启Resources: *
(如果是这个原因,AFAIK 你应该Resources: Test_Emr*
在生产中使用 somthing 以实现最低特权原则) - 对于第二次执行删除
dynamodb:Describe*
并检查您是否获得与要点中相同的堆栈跟踪
推荐阅读
- c# - 将字符串转换为浮点数对于某些数字但不是其他数字失败
- database - 服务器端预取数据库数据是否存在任何安全风险?
- sql - 如何在 SQLite 中将两列组合在一起?
- python - PyTest 弃用:'junit_family 默认值将更改为'xunit2'
- android - 创建新的 kotlin 项目时无法解决依赖关系
- multithreading - 线程中的 QProcess 有效,但有两种类型的输出错误
- wpf - ArcGIS .NET 从 WebMercator 转换为 Wgs84
- javascript - 停止制作表单以重新加载页面
- javascript - 最后如何在没有额外 AND 的情况下更好地构建这个 SQL 查询
- typo3 - 如何覆盖页面属性中的后端/制表符标签?