首页 > 解决方案 > 任务合并结果并继续

问题描述

我有 16 个任务做同样的工作,每个任务都返回一个数组。我想成对组合结果并做同样的工作,直到我只有一项任务。我不知道这样做的最佳方法是什么。

public static IComparatorNetwork[] Prune(IComparatorNetwork[] nets, int numTasks)
    {
        var tasks = new Task[numTasks];
        var netsPerTask = nets.Length/numTasks;
        var start = 0;
        var concurrentSet = new ConcurrentBag<IComparatorNetwork>();
        
        for(var i = 0; i  < numTasks; i++)
        {
            IComparatorNetwork[] taskNets;
            if (i == numTasks - 1)
            {
                taskNets = nets.Skip(start).ToArray();                 
            }
            else
            {
                taskNets = nets.Skip(start).Take(netsPerTask).ToArray();
            }

            start += netsPerTask;
            tasks[i] = Task.Factory.StartNew(() =>
            {
                var pruner = new Pruner();
                concurrentSet.AddRange(pruner.Prune(taskNets));
            });
        }

        Task.WaitAll(tasks.ToArray());

        if(numTasks > 1)
        {
            return Prune(concurrentSet.ToArray(), numTasks/2);
        }

        return concurrentSet.ToArray();
    }

现在我正在等待所有任务完成,然后我重复一半的任务,直到我只有一个。我希望不必在每次迭代中都等待。我对并行编程很陌生,可能这种方法很糟糕。我试图并行化的代码如下:

public IComparatorNetwork[] Prune(IComparatorNetwork[] nets)
    {
        var result = new List<IComparatorNetwork>();

        for (var i = 0; i < nets.Length; i++) 
        {
            var isSubsumed = false;

            for (var index = result.Count - 1; index >= 0; index--)
            {
                var n = result[index];

                if (nets[i].IsSubsumed(n))
                {
                    isSubsumed = true;
                    break;
                }

                if (n.IsSubsumed(nets[i]))
                {
                    result.Remove(n);
                }
            }

            if (!isSubsumed) 
            {
                result.Add(nets[i]);
            }
        }

        return result.ToArray();
    }`

标签: c#.netparallel-processingtask

解决方案


所以你在这里基本上做的是聚合值,但并行。幸运的是,PLINQ 已经有一个并行工作的 Aggregate 实现。因此,在您的情况下,您可以简单地将原始数组中的每个元素包装在其自己的一个元素数组中,然后您的Prune操作能够将任意两个网络数组组合成一个新的单个数组。

public static IComparatorNetwork[] Prune(IComparatorNetwork[] nets)
{
    return nets.Select(net => new[] { net })
        .AsParallel()
        .Aggregate((a, b) => new Pruner().Prune(a.Concat(b).ToArray()));
}

我对他们的聚合方法的内部知识不是很了解,但我想它可能非常好,并且不会花费大量时间不必要地等待。但是,如果你想自己写,这样你就可以确保工人总是在他们有新工作时就开始新工作,这是我自己的实现。随意在您的特定情况下比较两者,看看哪个最适合您的需求。请注意,PLINQ 可以通过多种方式进行配置,您可以随意尝试其他配置,看看哪种配置最适合您的情况。

public static T AggregateInParallel<T>(this IEnumerable<T> values, Func<T, T, T> function, int numTasks)
{
    Queue<T> queue = new Queue<T>();
    foreach (var value in values)
        queue.Enqueue(value);
    if (!queue.Any())
        return default(T);  //Consider throwing or doing something else here if the sequence is empty

    (T, T)? GetFromQueue()
    {
        lock (queue)
        {
            if (queue.Count >= 2)
            {
                return (queue.Dequeue(), queue.Dequeue());
            }
            else
            {
                return null;
            }
        }
    }

    var tasks = Enumerable.Range(0, numTasks)
        .Select(_ => Task.Run(() =>
        {
            var pair = GetFromQueue();
            while (pair != null)
            {
                var result = function(pair.Value.Item1, pair.Value.Item2);
                lock (queue)
                {
                    queue.Enqueue(result);
                }
                pair = GetFromQueue();
            }
        }))
        .ToArray();
    Task.WaitAll(tasks);
    return queue.Dequeue();
}

此版本的调用代码如下所示:

public static IComparatorNetwork[] Prune2(IComparatorNetwork[] nets)
{
    return nets.Select(net => new[] { net })
        .AggregateInParallel((a, b) => new Pruner().Prune(a.Concat(b).ToArray()), nets.Length / 2);
}

正如评论中提到的,您可以通过让修剪器Prune接受两个集合而不只是一个集合,并且只将每个集合中的项目与另一个进行比较,知道来自同一集合的所有项目不会包含任何其他集合,从而使修剪器的方法更加有效那个集合。这使得该方法不仅更短、更简单、更易于理解,而且还消除了相当大一部分昂贵的比较。一些小的修改也可以大大减少创建的中间集合的数量。

public static IReadOnlyList<IComparatorNetwork> Prune(IReadOnlyList<IComparatorNetwork> first, IReadOnlyList<IComparatorNetwork> second)
{
    var firstItemsNotSubsumed = first.Where(outerNet => !second.Any(innerNet => outerNet.IsSubsumed(innerNet)));
    var secondItemsNotSubsumed = second.Where(outerNet => !first.Any(innerNet => outerNet.IsSubsumed(innerNet)));
    return firstItemsNotSubsumed.Concat(secondItemsNotSubsumed).ToList();
}

调用代码只需要进行细微的调整以确保类型匹配并且您传入两个集合而不是先连接它们。

public static IReadOnlyList<IComparatorNetwork> Prune(IReadOnlyList<IComparatorNetwork> nets)
{
    return nets.Select(net => (IReadOnlyList<IComparatorNetwork>)new[] { net })
        .AggregateInParallel((a, b) => Pruner.Prune(a, b), nets.Count / 2);
}

推荐阅读