c# - BlockingCollection 中的多个消费者是否同时处理?
问题描述
我有一个 BlockingCollection,我从一个线程写入并从另一个线程读取。生产者线程获取从服务器接收的项目并将它们添加到 BlockingCollection,而读取线程尝试清空 BlockingCollection 并处理它们。
我试图分批清空队列的问题,因为一个一个处理它们会太慢。但是当它不断被写入(数千个项目)时,消费者线程会一直读取它们直到它被清空,这意味着在写入完成之前处理甚至不会开始。
现在,消费者中的处理可以并行完成,所以我一直想知道如何去做。
目前我有2个想法:
在消费者中从 BlockingCollection 读取一定数量的项目后,启动一个新的并行作业来处理它们,而不是等待完全清空队列然后开始处理。
使用多个消费者并希望它们并行运行,而不是在尝试同时读取 BlockingCollection 时不断地相互阻塞。
所以我的问题是关于选项 2 - BlockingCollection 是否针对这种情况进行了内部优化?它会划分读取的区域,还是消费者会为每个项目争吵?如果是这样的话,那么选项1更好吗?
解决方案
添加另一种选择:(绝不是生产就绪!)
这利用了 TPL 的 Dataflow,它BatchBlock<T>
为我们抽象了批处理。
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public class HoneyBatcher
{
private const int BATCHSIZE = 10; // Find the size that works best for you.
private readonly BatchBlock<Honey> batchBlock;
private readonly ExecutionDataflowBlockOptions _options =
new ExecutionDataflowBlockOptions()
{
// I'd start with 1, then benchmark if higher number actually benefits.
MaxDegreeOfParallelism = 1,
SingleProducerConstrained = true // if so, may micro-optimize throughput
};
// vv Whatever process you want done on a batch
public HoneyBatcher( Action<Honey[]> batchProcessor )
{
// BatchBlock does the batching
// and is the entrypoint to the pipline.
batchBlock = new BatchBlock<Honey>(BATCHSIZE);
// processorBlock processes each batch that batchBlock will produce
// Parallel executions as well as other tweaks can be configured through options.
ActionBlock<Honey[]> processorBlock =
new ActionBlock<Honey[]>(batchProcessor, _options);
// build the pipline
batchBlock.LinkTo(processorBlock);
// item => batchBlock => item[BATCHSIZE] => batchProcessor(item[])
}
// Add item individually and have them batched up
// and processed in a pipeline.
public Task<bool> ProcessAsync(Honey item)
{
return batchBlock.SendAsync(item);
// Can also be done with sync API.
}
}
public class Honey
{
// Just a dummy
}
请注意,上面的代码片段只是这个想法的粗略布局。在生产中,您当然会解决错误处理、完成等问题。
推荐阅读
- go - 如何在 Go 中对接口切片进行 json 解码?
- android - 跟踪 SVG 路径
- android - 如何使用 JAIN SIP 库接受传入的音频 SIP 呼叫?
- php - 有条件地添加到数组
- node.js - 如何编写可以从命令行调用以及从其他文件中调用的 Node.js 文件?
- android - 在 react-native 上按多个 TouchableOpacity 元素
- python - numpy:沿除最后一个轴之外的所有轴求和
- c# - 如何使用 SelectMany 缩短我的对象?
- ruby-on-rails - rspec 测试委派错误
- multithreading - Oracle 并发处理