首页 > 解决方案 > 允许所有工作人员的全局消耗率的队列服务器

问题描述

我的服务器需要处理许多任务。由于工作人员需要满足 API 调用速率限制,这些任务必须以特定的给定速率处理。

为了保证这些任务不会以高于 API 速率限制的速率执行,我希望能够配置队列发送消息以进行处理的速率。

此外,该队列必须保持推送消息的顺序并以 FIFO 顺序释放它们以提供公平性。

最后,如果编码明智,这将是很好的,这样在使用时客户端将通过 API 调用将消息发送到队列,并且同一客户端将在消息被队列释放后接收回消息工作率和相关顺序。例如使用 RxJava

waitForMessageToBeReleased(message, queue)
     .subscribe(message -> // do some stuff)  // message received to the same 
client after it was released by the queue according to the defined work rate.

我目前正在使用 Redis 通过创建一个具有特定 TTL 数量的变量来控制执行率,并且其他调用会等到该变量过期。但是,它不处理排序,并且在高负载的情况下可能导致客户端饿死。

标签: javaalgorithmmessage-queueamqp

解决方案


Cadence Workflow能够以最小的努力支持您的用例。

这是满足您要求的稻草人设计:

  • 使用 userID 作为工作流 ID 向用户工作流发送 signalWithStart 请求。它要么将信号传递给工作流,要么首先启动工作流并将信号传递给它。
  • 对该工作流的所有请求都由它缓冲。Cadence 提供了一个硬保证,即只有一个具有给定 ID 的工作流可以在打开状态下存在。因此,所有信号(事件)都保证在属于用户的工作流中得到缓冲。
  • 内部工作流事件循环会一一分派这些请求。
  • 当缓冲区为空时,工作流程可以完成。

这是在 Java 中实现它的工作流代码(也支持 Go 客户端):

public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

public interface TaskProcessorActivity {
    @ActivityMethod
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

然后通过信号方法将该任务排入工作流的代码:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(task.getUserId()).build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = cadenceClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = cadenceClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    cadenceClient.signalWithStart(request);
}

与使用队列进行任务处理相比,Cadence 提供了许多其他优势。

  • 建立了指数重试,具有无限的过期间隔
  • 故障处理。例如,如果在配置的时间间隔内两次更新都不能成功,它允许执行通知另一个服务的任务。
  • 支持长时间运行的心跳操作
  • 能够实现复杂的任务依赖。例如,在不可恢复的故障 ( SAGA )的情况下实现调用链或补偿逻辑
  • 提供对当前更新状态的完整可见性。例如,当使用队列时,您都知道队列中是否有一些消息,并且您需要额外的数据库来跟踪整体进度。使用 Cadence 记录每个事件。
  • 能够取消飞行中的更新。
  • 分布式 CRON 支持

请参阅有关Cadence 编程模型的演示文稿。


推荐阅读