首页 > 解决方案 > AWS GetActivityTask 从同一个状态机执行

问题描述

提前感谢您的时间和回复。

我有一个具有以下活动的 AWS 状态机。

  1. 从外部 FTP 服务器拉取第一个可用的数据文件
  2. 处理数据(处理时间可能会有所不同)
  3. 将处理后的数据上传到另一个 FTP 服务器

我有一个在 EC2 实例中运行的 java 应用程序,它有 3 个线程并使用代码轮询活动,如下所示。Java 应用程序调用适当的工作人员来执行步骤 #1、2 和 3 的实际工作。 这里的重点是,这里的所有 3 个活动都应该发生在同一服务器中,因为步骤从服务器中的文件位置写入和读取.

我有数百个文件要在 FTP 服务器中处理,所以我有 5 个 Ec2 服务器运行 java 应用程序的副本。

现在我开始执行 5 次状态机。这将允许我在 5 台服务器上分发文件处理。

然而,我的问题是这样的:

如何确保来自给定状态机执行的活动由相同的 EC2 实例服务器处理。

我不希望给定的执行活动由不同的 EC2 实例处理。在下面的代码中(来自https://github.com/goosefraba/aws-step-function-activity-example/blob/master/src/main/java/at/goosefraba/ActivityProcessor.java),我没有看到getActivityTask属于特定执行的任何方式。

  final ClientConfiguration clientConfiguration = new ClientConfiguration();
    clientConfiguration.setSocketTimeout((int) TimeUnit.SECONDS.toMillis(70));

    final AWSStepFunctions client = AWSStepFunctionsClientBuilder
            .standard()
            .withClientConfiguration(clientConfiguration)
            .build();

    while (true) {
        GetActivityTaskResult getActivityTaskResult =
                client.getActivityTask(
                        new GetActivityTaskRequest().withActivityArn(getArn()));
        if (getActivityTaskResult.getTaskToken() != null) {
                // Do work
        }
    }

标签: amazon-web-servicesaws-step-functions

解决方案


咨询了 AWS 技术支持,但没有得到答案。最后通过使用以下方法解决了问题;希望这对其他人有用。

  1. 通过提供一个唯一的 JobId(它可以是来自 FTPServer 的文件的名称,假设它是唯一的)来启动状态机执行。

    { "JobId": "MyUniqueJobId", }

java代码是:

        final AWSStepFunctions client = AWSStepFunctionsClientBuilder
                .standard()
                .build();
        StartExecutionRequest startExecutionRequest = new StartExecutionRequest();
        // StartReportGenerator execution
        startExecutionRequest.setStateMachineArn("arn:aws:states:us-east-1:xxxxxxx:stateMachine:poc");
        String uuid = UUID.randomUUID().toString();
        startExecutionRequest.setName(uuid);
        String inputJson = "{\"JobId\":\"MyUniqueJobId\"}";
        startExecutionRequest.setInput(inputJson);
        client.startExecution(startExecutionRequest);
  1. 检索活动后,检索 json 并检查它是否等于我在启动状态机时设置的那个。如果相等,那么我已经选择了属于我开始的执行的活动。否则,向 Activity 发送失败并返回错误“ Retry

    GetActivityTaskResult getActivityTaskResult =
       client.getActivityTask(
                    new GetActivityTaskRequest().withActivityArn("activityARN));
    
    if (getActivityTaskResult.getTaskToken() != null) {
        log.info("Kicking off {} acitivity ...", getProcessName());
        String errorMessage = "";
        final JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
        String jobId = json.get("JobId").textValue();
        if (!jobId.equals("**MyUniqueJobId**")){
            log.error("Looking to retrieve JobId MyUniqueJobId, but found " + jobId + " instead; retrying");
            errorMessage = "Retry";
            client.sendTaskFailure(
                    new SendTaskFailureRequest()
                            .withError(errorMessage)
                            .withTaskToken(getActivityTaskResult.getTaskToken()));
        }
    }
    
  2. 状态机 json 的构造方式是,如果活动因“重试”错误而失败,则再次尝试相同的活动。

{
  "Comment": "State machine",
  "StartAt": "RunActivityOne",
  "States": {
    "RunActivityOne": {
      "Type": "Task",
      "TimeoutSeconds": 600,
      "ResultPath": "$.Result",
      "Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityOne",
      "Catch": [
        {
          "ErrorEquals": [
            "Retry"
          ],
          "ResultPath": "$.Result",
          "Next": "RunActivityOne"
        },
        {
          "ErrorEquals": [
            "States.TaskFailed",
            "States.Timeout"
          ],
          "Next": "RunActivityOneFailure"
        }
      ],
      "Next": "RunActivityTwo"
    },
    "RunActivityOneFailure": {
      "Type": "Fail",
      "Cause": "RunActivityOneFailure",
      "Error": "RunActivityOneFailure"
    },
    "RunActivityTwo": {
      "Type": "Task",
      "TimeoutSeconds": 600,
      "ResultPath": "$.Result",
      "Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityTwo",
      "Catch": [
        {
          "ErrorEquals": [
            "Retry"
          ],
          "ResultPath": "$.Result",
          "Next": "RunActivityTwo"
        },
        {
          "ErrorEquals": [
            "States.TaskFailed",
            "States.Timeout"
          ],
          "Next": "RunActivityTwoFailure"
        }
      ],
      "End": true
    },
    "RunActivityTwoFailure": {
      "Type": "Fail",
      "Cause": "RunActivityTwoFailure",
      "Error": "RunActivityTwoFailure"
    }
  }
}

这样,我最终将只处理属于我开始的执行的活动。这种方法的缺点是:

  • 我们不能说最终选择正确的活动需要多少次尝试。
  • AWS 根据转换次数收费

推荐阅读