首页 > 解决方案 > 可枚举的 foreach 扩展

问题描述

我创建了 Enumerable 的扩展来快速执行操作,所以我列出了并在这个方法中,我循环并且如果对象在某个超时执行该方法我返回,现在我想让输出通用,因为方法输出会有所不同,关于做什么的任何建议

这个IEnumerable的进程,就像负载均衡,如果第一个没有响应第二个应该,我想返回输入Action的输出

public static class EnumerableExtensions
{
    public static void ForEach<T>(this IEnumerable<T> source, Action action, int timeOut)
    {            
        foreach (T element in source)
        {
            lock (source)
            {
                // Loop for all connections and get the fastest responsive proxy 
                foreach (var mxAccessProxy in source)
                {
                    try
                    {
                        // check for the health 
                        Task executionTask = Task.Run(action);
                        if (executionTask.Wait(timeOut))
                        {
                            return  ;
                        }
                    }
                    catch
                    {
                        //ignore 
                    }

                }
            }
        }
    }
}

这段代码像

  _proxies.ForEach(certainaction, timeOut);

标签: c#ienumerable

解决方案


这将提高性能和代码可读性

不,它绝对不会 :) 此外,您会为这段代码带来更多问题,例如冗余锁定或异常吞咽,但实际上并没有并行执行代码。

似乎您希望Action使用某种代理对象获得最快的调用。您需要Tasks异步运行,而不是因此使用.Wait().

这样的事情可能对您有所帮助:

public static class TaskExtensions
{
    public static TReturn ParallelSelectReturnFastest<TPoolObject, TReturn>(this TPoolObject[] pool,
        Func<TPoolObject, CancellationToken, TReturn> func, 
        int? timeout = null)
    {
        var ctx = new CancellationTokenSource();

        // for every object in pool schedule a task
        Task<TReturn>[] tasks = pool
            .Select(poolObject =>
            {
                ctx.Token.ThrowIfCancellationRequested();
                return Task.Factory.StartNew(() => func(poolObject, ctx.Token), ctx.Token);
            })
            .ToArray();

       // not sure if Cast is actually needed, 
       // just to get rid of co-variant array conversion
       int firstCompletedIndex = timeout.HasValue
            ? Task.WaitAny(tasks.Cast<Task>().ToArray(), timeout.Value, ctx.Token)
            : Task.WaitAny(tasks.Cast<Task>().ToArray(), ctx.Token);

        // we need to cancel token to avoid unnecessary work to be done
        ctx.Cancel();

        if (firstCompletedIndex == -1) // no objects in pool managed to complete action in time
            throw new NotImplementedException(); // custom exception goes here

        return tasks[firstCompletedIndex].Result;
    }
}

现在,您可以使用此扩展方法对任何对象池调用特定操作并获取第一个执行结果:

var pool = new[] { 1, 2, 3, 4, 5 };
var result = pool.ParallelSelectReturnFastest((x, token) => { 
    Thread.Sleep(x * 200); 
    token.ThrowIfCancellationRequested();
    Console.WriteLine("calculate");
    return x * x; 
}, 100);    
Console.WriteLine(result);    

它输出:

计算 1

因为第一个任务会在 200ms 内完成工作,所以返回它,其他所有任务都会通过取消令牌取消。

在您的情况下,它将类似于:

var actionResponse = proxiesList.ParallelSelectReturnFastest((proxy, token) => {
    token.ThrowIfCancellationRequested();
    return proxy.SomeAction();        
});

有几点要提:

  • 确保您的行为是安全的。您不能依赖其中有多少会实际执行您的操作。如果此操作是CreateItem,那么您最终可以通过不同的代理创建许多项目
  • 它不能保证您将并行运行所有这些操作,因为由 TPL 选择最佳的运行任务数量
  • 我已经以老式的 TPL 方式实施,因为您最初的问题包含它。如果可能,您需要切换到 async/await - 在这种情况下,您Func将返回任务并且您需要使用await Task.WhenAny(tasks)而不是Task.WaitAny()

推荐阅读