java - AWS Step Functions 如何处理 Worker/Activity 竞争条件?
问题描述
我正在尝试将 Java 与带有 AWS Step Functions 的 Spring Boot 框架一起使用,作为下面的示例之一。在这里,随着服务的启动,我们在 Java 中设置了一个 Runnable 线程,并将这个线程注册为一个“工作”线程,它是 AWS Step Function 中的一个 Activity。
这是我的问题:
当注册相同 Activity(相同 ARN)的多个工作人员时,AWS Step Function 如何处理竞争条件,Step Function 如何避免超过 1 个工作人员拿起任务并执行重复工作?在 AWS Step Functions 配置中,我们是否可以更改任何设置以防止在同一 AZ、同一区域的多个工作人员中发生这种竞争情况?
import com.amazonaws.services.stepfunctions.AWSStepFunctions;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskRequest;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskResult;
@Component
class MyActivity implements DisposableBean, Runnable {
private final int WAITS_FOR_ACTIVITY_MILLISECONDS = 500;
public void run() {
while (shouldRun) {
GetActivityTaskResult getActivityTaskResult =
client.getActivityTask(
new GetActivityTaskRequest()
.withActivityArn(config.getActivityArns().getMyActivity()));
String taskToken = getActivityTaskResult.getTaskToken();
if (getActivityTaskResult.getTaskToken() != null) {
try {
// Get input
JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
ActivityInput input =
gson.fromJson(json.toString(), ActivityInput.class);
// do some work
client.sendTaskSuccess(
new SendTaskSuccessRequest()
.withOutput(gson.toJson(output))
.withTaskToken(taskToken));
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
client.sendTaskFailure(
new SendTaskFailureRequest().withTaskToken(taskToken).withError(e.getMessage()));
}
} else {
try {
Thread.sleep(WAITS_FOR_ACTIVITY_MILLISECONDS);
} catch (InterruptedException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
}
}
@Override
public void destroy() {
shouldRun = false;
}
}
解决方案
多个活动工作者可以轮询相同类型的活动。Step Functions 管理多个同时执行的状态机和轮询工作的多个活动工作者之间的多对多关系。
每次活动工作人员成功地从处于感兴趣活动任务状态的执行状态机轮询工作时,Step Functions 都会向活动工作人员分派一个唯一令牌以及对任务状态的 JSON 输入。
下一个活动工作者轮询将不会被分配相同的任务。当活动工作人员使用 ActivityTaskSuccess 或 ActivityTaskFailure 调用 Step Functions API 时,它会返回结果和令牌。Step Function 使用令牌将结果与适当的状态机相匹配。对于大型工作负载,您可以创建活动工作人员的 Auto Scaling 组并根据需求扩展工作人员。
推荐阅读
- python - python将mailBody附加到电子邮件中不起作用
- python-3.x - "0, 0, 0, 0 in b for b in bytes(4)" 中的 0 是什么
- json - JSR223 预处理器在使用 CSV 在 Jmeter 中创建 json 时发出错误请求
- c# - 在 VS 扩展中调用抽象和派生的 tt 模板
- javascript - 从 html 转换为 docx 格式时,Pandoc 不渲染 SVG 图像
- java - 检测框架布局内 webview 的触摸
- angular - 找不到模块“id”
- module - Odoo 10:安装后在顶部菜单栏中看不到我的自定义模块
- python - 如何通过命令行切换进程状态?
- vba - 打印宏 - 问题