首页 > 解决方案 > BlockingCollection 中的多个消费者是否同时处理?

问题描述

我有一个 BlockingCollection,我从一个线程写入并从另一个线程读取。生产者线程获取从服务器接收的项目并将它们添加到 BlockingCollection,而读取线程尝试清空 BlockingCollection 并处理它们。

我试图分批清空队列的问题,因为一个一个处理它们会太慢。但是当它不断被写入(数千个项目)时,消费者线程会一直读取它们直到它被清空,这意味着在写入完成之前处理甚至不会开始。

现在,消费者中的处理可以并行完成,所以我一直想知道如何去做。

目前我有2个想法:

  1. 在消费者中从 BlockingCollection 读取一定数量的项目后,启动一个新的并行作业来处理它们,而不是等待完全清空队列然后开始处理。

  2. 使用多个消费者并希望它们并行运行,而不是在尝试同时读取 BlockingCollection 时不断地相互阻塞。

所以我的问题是关于选项 2 - BlockingCollection 是否针对这种情况进行了内部优化?它会划分读取的区域,还是消费者会为每个项目争吵?如果是这样的话,那么选项1更好吗?

标签: c#multithreadingconcurrencyblockingcollection

解决方案


添加另一种选择:(绝不是生产就绪!)

这利用了 TPL 的 Dataflow,它BatchBlock<T>为我们抽象了批处理。

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public class HoneyBatcher
{
    private const int BATCHSIZE = 10; // Find the size that works best for you.
    private readonly BatchBlock<Honey> batchBlock;
    private readonly ExecutionDataflowBlockOptions _options = 
                     new ExecutionDataflowBlockOptions()
    {
         // I'd start with 1, then benchmark if higher number actually benefits.
         MaxDegreeOfParallelism = 1, 
         SingleProducerConstrained = true // if so, may micro-optimize throughput
    };
                       // vv Whatever process you want done on a batch
    public HoneyBatcher( Action<Honey[]> batchProcessor )
    {
        // BatchBlock does the batching
        // and is the entrypoint to the pipline.
        batchBlock = new BatchBlock<Honey>(BATCHSIZE);
        // processorBlock processes each batch that batchBlock will produce
        // Parallel executions as well as other tweaks can be configured through options.
        ActionBlock<Honey[]> processorBlock = 
                             new ActionBlock<Honey[]>(batchProcessor, _options);
        // build the pipline
        batchBlock.LinkTo(processorBlock);
        // item => batchBlock => item[BATCHSIZE] => batchProcessor(item[])
    }

    // Add item individually and have them batched up
    // and processed in a pipeline.
    public Task<bool> ProcessAsync(Honey item)
    {
        return batchBlock.SendAsync(item);
        // Can also be done with sync API.
    }
}

public class Honey 
{
    // Just a dummy
}

请注意,上面的代码片段只是这个想法的粗略布局。在生产中,您当然会解决错误处理、完成等问题。


推荐阅读