amazon-web-services - AWS GetActivityTask 从同一个状态机执行
问题描述
提前感谢您的时间和回复。
我有一个具有以下活动的 AWS 状态机。
- 从外部 FTP 服务器拉取第一个可用的数据文件
- 处理数据(处理时间可能会有所不同)
- 将处理后的数据上传到另一个 FTP 服务器
我有一个在 EC2 实例中运行的 java 应用程序,它有 3 个线程并使用代码轮询活动,如下所示。Java 应用程序调用适当的工作人员来执行步骤 #1、2 和 3 的实际工作。 这里的重点是,这里的所有 3 个活动都应该发生在同一服务器中,因为步骤从服务器中的文件位置写入和读取.
我有数百个文件要在 FTP 服务器中处理,所以我有 5 个 Ec2 服务器运行 java 应用程序的副本。
现在我开始执行 5 次状态机。这将允许我在 5 台服务器上分发文件处理。
然而,我的问题是这样的:
如何确保来自给定状态机执行的活动由相同的 EC2 实例服务器处理。
我不希望给定的执行活动由不同的 EC2 实例处理。在下面的代码中(来自https://github.com/goosefraba/aws-step-function-activity-example/blob/master/src/main/java/at/goosefraba/ActivityProcessor.java),我没有看到getActivityTask属于特定执行的任何方式。
final ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setSocketTimeout((int) TimeUnit.SECONDS.toMillis(70));
final AWSStepFunctions client = AWSStepFunctionsClientBuilder
.standard()
.withClientConfiguration(clientConfiguration)
.build();
while (true) {
GetActivityTaskResult getActivityTaskResult =
client.getActivityTask(
new GetActivityTaskRequest().withActivityArn(getArn()));
if (getActivityTaskResult.getTaskToken() != null) {
// Do work
}
}
解决方案
咨询了 AWS 技术支持,但没有得到答案。最后通过使用以下方法解决了问题;希望这对其他人有用。
通过提供一个唯一的 JobId(它可以是来自 FTPServer 的文件的名称,假设它是唯一的)来启动状态机执行。
{ "JobId": "MyUniqueJobId", }
java代码是:
final AWSStepFunctions client = AWSStepFunctionsClientBuilder
.standard()
.build();
StartExecutionRequest startExecutionRequest = new StartExecutionRequest();
// StartReportGenerator execution
startExecutionRequest.setStateMachineArn("arn:aws:states:us-east-1:xxxxxxx:stateMachine:poc");
String uuid = UUID.randomUUID().toString();
startExecutionRequest.setName(uuid);
String inputJson = "{\"JobId\":\"MyUniqueJobId\"}";
startExecutionRequest.setInput(inputJson);
client.startExecution(startExecutionRequest);
检索活动后,检索 json 并检查它是否等于我在启动状态机时设置的那个。如果相等,那么我已经选择了属于我开始的执行的活动。否则,向 Activity 发送失败并返回错误“ Retry ”
GetActivityTaskResult getActivityTaskResult = client.getActivityTask( new GetActivityTaskRequest().withActivityArn("activityARN)); if (getActivityTaskResult.getTaskToken() != null) { log.info("Kicking off {} acitivity ...", getProcessName()); String errorMessage = ""; final JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput()); String jobId = json.get("JobId").textValue(); if (!jobId.equals("**MyUniqueJobId**")){ log.error("Looking to retrieve JobId MyUniqueJobId, but found " + jobId + " instead; retrying"); errorMessage = "Retry"; client.sendTaskFailure( new SendTaskFailureRequest() .withError(errorMessage) .withTaskToken(getActivityTaskResult.getTaskToken())); } }
状态机 json 的构造方式是,如果活动因“重试”错误而失败,则再次尝试相同的活动。
{
"Comment": "State machine",
"StartAt": "RunActivityOne",
"States": {
"RunActivityOne": {
"Type": "Task",
"TimeoutSeconds": 600,
"ResultPath": "$.Result",
"Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityOne",
"Catch": [
{
"ErrorEquals": [
"Retry"
],
"ResultPath": "$.Result",
"Next": "RunActivityOne"
},
{
"ErrorEquals": [
"States.TaskFailed",
"States.Timeout"
],
"Next": "RunActivityOneFailure"
}
],
"Next": "RunActivityTwo"
},
"RunActivityOneFailure": {
"Type": "Fail",
"Cause": "RunActivityOneFailure",
"Error": "RunActivityOneFailure"
},
"RunActivityTwo": {
"Type": "Task",
"TimeoutSeconds": 600,
"ResultPath": "$.Result",
"Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityTwo",
"Catch": [
{
"ErrorEquals": [
"Retry"
],
"ResultPath": "$.Result",
"Next": "RunActivityTwo"
},
{
"ErrorEquals": [
"States.TaskFailed",
"States.Timeout"
],
"Next": "RunActivityTwoFailure"
}
],
"End": true
},
"RunActivityTwoFailure": {
"Type": "Fail",
"Cause": "RunActivityTwoFailure",
"Error": "RunActivityTwoFailure"
}
}
}
这样,我最终将只处理属于我开始的执行的活动。这种方法的缺点是:
- 我们不能说最终选择正确的活动需要多少次尝试。
- AWS 根据转换次数收费
推荐阅读
- python - 在 2 个不同的绘图数据框直方图上共享相同的 x 轴
- java - 如何让 MappingIterator 不包含空行?
- react-native - 带有图像的卡片组件不完全适合卡片
- c++ - 如何在旧版驱动程序中使用 PE 标头进行用户修改后获取原始文件类型
- android - 在 Android 上找不到 React Native 中的捆绑图像
- android - Maven 库不会从 POM 中的私有存储库中获取依赖项
- python - 如何使用python将列插入excel文件?
- reactjs - 使用reactjs根据最近时间和数据类型列出数据?
- php - 在 PhpStorm 中开发 WordPress 插件:“找不到路径”-“未解析包含表达式”
- angular - Typescript导入别名+桶文件