首页 > 解决方案 > ActiveMQ:如何分叉加入?IE。当所有子任务完成时如何发出一条消息

问题描述

想象一下你有一些任务结构

Task1
Task2: 1 million separate independent Subtask[i] that can run concurrently
Task3: must run once after ALL Task2 subtasks have completed

并且Task1、Subtask[i]和Task3都由MQ消息表示。如何在 ActiveMQ 上解决这个问题?尤其是在所有子任务完成后触发 Task3 消息。

我知道,这不是排队问题,而是分叉连接问题。假设环境要求您必须为此使用 ActiveMQ。

允许使用 ActiveMQ 功能、动态队列和消费者等。不允许使用外部计数器,例如表示 Task2 进度的数据库行。

标签: javaparallel-processingjmsactivemqproducer-consumer

解决方案


隐藏在这个分叉连接问题中的是状态管理和可观察性挑战。由于排除了数据库,因此您必须依赖内存或队列中的某些内容。

  1. 为任务运行创建一个唯一的 id - 简短但有足够的空间不会像飞机定位器代码那样发生碰撞 - 即。34FDSX
  2. 将任务的所有消息发送到队列://TASK.34FDSX.DATA
  3. 向 queue://TASK.34FDSX.CONTROL 发送控制消息,其中包含任务 ID 和预期的消息总数(包括每个 messageId 也会有帮助)
  4. 当来自 queue://TASK.34FDSX.DATA 的消费者完成他们的工作时,他们应该使用他们的 messageId 或一些标识符向 queue://TASK.34FDSX.DONE 队列发送一个“完成”消息。

.CONTROL 队列和 .DONE 队列的消费者应该是同一个进程,并且可以跟踪预期和完成的任务总数。一切完成后,他可以触发事件以触发任务 #3。

这种方法将所有内容都提供为“在线”,如果在任务完成之前经过了太多时间,您还可以使 .CONTROL 和 .DONE 阅读器超时。

队列删除可以使用 ActiveMQ 目标 GC 来完成,或者在一切成功完成时作为 .CONTROL/.DONE 读取器中的清理步骤。

优点:

  • 没有无限阻塞消费者
  • 没有无限的开放交易
  • TASK 的状态是在线的,并且可以通过队列和队列指标的存在来观察——队列大小、入队数、出队数
  • 整个解决方案可以是多线程的,唯一的要求是对于给定任务,.CONTROL/.DONE 侦听器是同一个使用者,但多个任务可以有单独的 .CONTROL/.DONE 侦听器来扩展。

推荐阅读