首页 > 解决方案 > AWS Step Functions 如何处理 Worker/Activity 竞争条件?

问题描述

我正在尝试将 Java 与带有 AWS Step Functions 的 Spring Boot 框架一起使用,作为下面的示例之一。在这里,随着服务的启动,我们在 Java 中设置了一个 Runnable 线程,并将这个线程注册为一个“工作”线程,它是 AWS Step Function 中的一个 Activity。

这是我的问题:

当注册相同 Activity(相同 ARN)的多个工作人员时,AWS Step Function 如何处理竞争条件,Step Function 如何避免超过 1 个工作人员拿起任务并执行重复工作?在 AWS Step Functions 配置中,我们是否可以更改任何设置以防止在同一 AZ、同一区域的多个工作人员中发生这种竞争情况?


    import com.amazonaws.services.stepfunctions.AWSStepFunctions;
    import com.amazonaws.services.stepfunctions.model.GetActivityTaskRequest;
    import com.amazonaws.services.stepfunctions.model.GetActivityTaskResult;

    @Component
    class MyActivity implements DisposableBean, Runnable {

      private final int WAITS_FOR_ACTIVITY_MILLISECONDS = 500;
    

      public void run() {
        while (shouldRun) {
          GetActivityTaskResult getActivityTaskResult =
              client.getActivityTask(
                  new GetActivityTaskRequest()
                      .withActivityArn(config.getActivityArns().getMyActivity()));
          String taskToken = getActivityTaskResult.getTaskToken();
          if (getActivityTaskResult.getTaskToken() != null) {
            try {

              // Get input
              JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
              ActivityInput input =
                  gson.fromJson(json.toString(), ActivityInput.class);

              // do some work

              
              client.sendTaskSuccess(
                  new SendTaskSuccessRequest()
                      .withOutput(gson.toJson(output))
                      .withTaskToken(taskToken));
            } catch (Exception e) {
              logger.error(e.getMessage());
              e.printStackTrace();
              client.sendTaskFailure(
                  new SendTaskFailureRequest().withTaskToken(taskToken).withError(e.getMessage()));
            }
          } else {
            try {
              Thread.sleep(WAITS_FOR_ACTIVITY_MILLISECONDS);
            } catch (InterruptedException e) {
              logger.error(e.getMessage());
              e.printStackTrace();
            }
          }
        }
      }
  


      @Override
      public void destroy() {
        shouldRun = false;
      }

    }

标签: javaamazon-web-servicesspring-bootaws-step-functions

解决方案


多个活动工作者可以轮询相同类型的活动。Step Functions 管理多个同时执行的状态机和轮询工作的多个活动工作者之间的多对多关系。

每次活动工作人员成功地从处于感兴趣活动任务状态的执行状态机轮询工作时,Step Functions 都会向活动工作人员分派一个唯一令牌以及对任务状态的 JSON 输入。

下一个活动工作者轮询将不会被分配相同的任务。当活动工作人员使用 ActivityTaskSuccess 或 ActivityTaskFailure 调用 Step Functions API 时,它会返回结果和令牌。Step Function 使用令牌将结果与适当的状态机相匹配。对于大型工作负载,您可以创建活动工作人员的 Auto Scaling 组并根据需求扩展工作人员。


推荐阅读