首页 > 解决方案 > BlockingCollection 每组多个消费者 FIFO

问题描述

我有一个将消息插入 BlockingCollection 的生产者。

集合中的每个消息都有一个 groupId。不同 groupId 的计数是动态的。每组的消息必须按 FIFO 顺序处理。消费者所做的工作是一个小的计算和一个数据库插入,有时是一个额外的 http 请求。

我尝试为每个 groupId 创建一个新的消费者线程 - 但线程过多存在问题。

当我像这样创建 50 个消费者线程时,消息的处理速度要快得多:

for (int i = 0; i < 50; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            var group = item.groupId;
            // do work
        }
    }
}

但是使用此代码,每个 groupId 的消息不会按顺序处理。

有没有一种简单的方法可以让每个 groupId 的消费者“粘性”?

例如:当消费者 3 处理了 groupId 为 7 的消息时,所有其他 groupId 为 7 的消息必须由消费者 3 处理。

标签: c#queuefifo

解决方案


推荐阅读