首页 > 解决方案 > 基于需要在执行时间内接收更多项目的列表执行 Parallel.ForEach

问题描述

我在这里遇到了一个挑战,找到解决方案让我很头疼。

我有一个List东西,我Parallel.ForEach基于它执行:

List<Customer> customers = GetNotProcessedCostumer(); 

Parallel.ForEach(customers, new ParallelOptions {MaxDegreeOfParallelism = 2},
cust=> 
{
     ExecuteSomething(cust);
});

这里的问题是我需要GetNotProcessedCostumer再次调用以检查数据库上是否有新的未处理项目可用,而此并行仍在运行。再次调用该方法是可以的,但是,如何List在并行已经使用的中插入新项目?

换句话说,它List<Customer>是活的,我需要一直在上面插入项目,并尝试使用现有的可用线程Parallel。看一看:

List<Customer> customers = GetNotProcessCustomer // get not processed customers from database

Parallel.ForEach(customers) // ...... Start the parallel ... 

customer.Add(GetNotProcessCustomer()) // Read database again..

“嘿 Parallel,你有可用的线程吗?” 如果是,请使用它。

我可以接受其他方法和想法,例如ThreadsThreadPool......

有人可以帮我吗?

标签: c#.netmultithreadingparallel.foreach

解决方案


可能有比Parallel上课更好的方法来完成这项工作,ActionBlock<Customer>来自TPL 数据流库的方法是最有希望的候选者。但是,如果您想使用已有的知识来完成工作,您可以使用延迟IEnumerable<Customer>序列而不是物化的List<Customer>. 这个序列将查询数据库并在一个永无止境的循环中产生未处理的客户。在混合中添加一个可能是个好主意Task.Delay,以确保查询数据库的频率不会超过每 X 秒。

IEnumerable<Customer> GetNotProcessedCustomersNonStop(
    CancellationToken cancellationToken = default)
{
    while (true)
    {
        var delayTask = Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
        foreach (var customer in GetNotProcessedCustomers())
            yield return customer;
        delayTask.GetAwaiter().GetResult();
    }
}

在混合中添加 aCancellationToken可能也是一个好主意,因为最终你想停止循环,不是吗?

如果你不熟悉延迟可枚举序列和yield语句,你可以看看这个文档:Iterators

最后一个重要的细节是告诉Parallel类你不希望它做一些花哨的事情,比如贪婪地枚举可枚举对象并缓存它的项目。您希望它仅在准备好处理它时才能抓住下一个客户。你可以通过Partitioner.Create在混合物中加入 a 来做到这一点。把所有东西放在一起:

var cts = new CancellationTokenSource();

var source = Partitioner.Create(GetNotProcessedCustomersNonStop(cts.Token),
    EnumerablePartitionerOptions.NoBuffering);

var parallelOptions = new ParallelOptions()
{
    MaxDegreeOfParallelism = 2,
    CancellationToken = cts.Token,
};

Parallel.ForEach(source, parallelOptions, customer =>
{
    ProcessCustomer(customer);
});

//cts.Cancel(); // eventually...

推荐阅读