首页 > 解决方案 > 安排以不同的速率向消费者发送消息

问题描述

我正在寻找消息调度的最佳算法。我的意思是消息调度是当我们有许多消费者以不同的速率在总线上发送消息的一种方式。

示例:假设我们有数据 D1 到 Dn。D1 每 5ms 发送给多个消费者 C1,每 19ms C2,每 30ms C3,每 Rn ms 发送 Cn。Dn 每 10ms 发送到 C1,C2 每 31ms,Cn 每 50ms

以最佳性能(CPU、内存、IO)安排此操作的最佳算法是什么?

问候

标签: algorithmoptimizationscheduling

解决方案


我能想到很多选择,每个都有自己的成本和收益。真正归结为您的需求是什么——真正为您定义“最佳”的东西。我在下面对几种可能性进行了伪编码,希望能帮助您入门。

选项 1:每个时间单位执行以下操作(在您的示例中,毫秒)

func callEachMs
    time = getCurrentTime()
    for each datum
        for each customer
            if time % datum.customer.rate == 0
                sendMsg()

这样做的好处是不需要一致存储的内存——您只需在每个时间单位检查您是否应该发送消息。这也可以处理未发送time == 0的消息 - 只需存储消息最初发送的时间以速率为模,并将条件替换为if time % datum.customer.rate == data.customer.firstMsgTimeMod.

这种方法的一个缺点是它完全依赖于总是以 1 毫秒的速率被调用。如果 CPU 上的另一个进程导致延迟并且它错过了一个周期,那么您可能会完全错过发送消息(而不是稍微延迟发送)。

选项 2:维护一个元组列表列表,其中每个条目代表需要在该毫秒内完成的任务。使您的列表至少与最长速率除以时间单位一样长(如果您的最长速率是 50 毫秒并且您要以毫秒为单位,那么您的列表必须至少为 50 长)。当你启动你的程序时,放置第一个消息将被发送到队列中。然后每次发送消息时,在下次发送时在该列表中更新。

func buildList(&list)
    for each datum
        for each customer
            if list.size < datum.customer.rate
                list.resize(datum.customer.rate+1)
            list[customer.rate].push_back(tuple(datum.name, customer.name))

func callEachMs(&list)
    for each (datum.name, customer.name) in list[0]
        sendMsg()
        list[customer.rate].push_back((datum.name, customer.name))
    list.pop_front()
    list.push_back(empty list)

这样做的好处是避免了许多不必要的模数计算选项 1 所需的。但是,这会带来内存使用量增加的成本。如果您的各种消息的速率存在很大差异,则此实现也不会有效(尽管您可以修改它以更有效地处理具有更长速率的算法)。它仍然必须每毫秒调用一次。

最后,您必须非常仔细地考虑您使用的数据结构,因为这将对其效率产生巨大影响。因为您在每次迭代时从前面弹出并从后面推送,并且列表是固定大小的,所以您可能希望实现循环缓冲区以避免不必要的值移动。对于元组列表,因为它们只被迭代(不需要随机访问),并且经常添加,单链表可能是您最好的解决方案。

.

显然,还有更多方法可以做到这一点,但希望这些想法可以帮助您入门。另外,请记住,您运行它的系统的性质可能会对哪种方法效果更好,或者您是否想要完全做其他事情产生强烈影响。例如,这两种方法都要求能够以一定的速率可靠地调用它们。我也没有描述并行化实现,如果您的应用程序支持它们,这可能是最好的选择。


推荐阅读