首页 > 解决方案 > 异步并行循环返回任务之外太快,除非我在循环中检查完整的布尔值

问题描述

我的函数会很快返回信息。这真是个好消息!但是,因为并行循环正在异步运行,所以函数在循环完成之前返回一个值,除非我在主线程上执行一些长时间运行的任务以等待结果。没有 UI 可以阻止,所以我正在使用 async/await 来尝试推动 TPL 中的快速结果。

我引入了一个布尔标志值和 while 循环来等待结果。

这有效,但看起来很奇怪。

在我的特殊情况下,有没有更好的方法来“等待”结果。当我使用“while 循环”时,在第一个代码片段中,事情看起来很奇怪。

注意:我应该提到,因为它是一个 Alexa 响应,所以在这个函数之外是一个任务,它会“Task.Delays”八秒钟,然后返回一个响应,以防我的其他任务需要很长时间并且 Alexa 将超时。

private static string SavedImageAnalysisResult(IReadOnlyCollection<Image> savedImageList, ConfigurationDto config)
    {
        string result = "[]";
        BreakImageAnalysis = false;

        if (!savedImageList.Any()) return result;

        Parallel.ForEach(savedImageList, new ParallelOptions
        {
            MaxDegreeOfParallelism = 5000
        },
            async (image, loopState) =>
            {
                Task.Run(() => Console.Write("██"));

                string threadLocalAnalysisResult =
                    await AnalyzeImageAsync(image.ImageBytes, config);

                if (IsEmptyOrErrorAnalysis(threadLocalAnalysisResult)) return;
                Task.Run(() => Console.Write("█ █"));
                result = threadLocalAnalysisResult;
                BreakImageAnalysis = true;
                loopState.Break();
            });

        while (!BreakImageAnalysis) if (BreakImageAnalysis) break; //strange to do this?

        return result;
    }

这个函数是这样调用的:

public static List<Person> DetectPersonAsync()
    {
        Task.Run(() => Console.WriteLine("{0}\nNew Person Detection Requested...", DateTime.Now.ToString("f")));

        ConfigurationDto config = Configuration.GetSettings();

        camera = new SecurityCamera();

        byte[] imageData = camera.GetImageAsByte(config.SecurityCameraUrl +
                                                 config.SecurityCameraStaticImage +
                                                 DateTime.Now);

        if (!imageData.Any()) return null;

        string imageAnalysis = "[]";

        SavedImageList = camera.ImageCache;

        Task.Run(() => Console.WriteLine("\nBegin Image Analysis...\n"));

        var imageAnalysisTasks = new[]
        {
            Task.Factory.StartNew(() => SavedImageAnalysisResult(SavedImageList, config)),
            Task.Factory.StartNew(() => InProgressImageAnalysisResult(camera, config))
        };

        Task.WaitAll(imageAnalysisTasks);

        Task.Run(() => Console.WriteLine("\n\nAnalysis complete\n"));

        if (!IsEmptyOrErrorAnalysis(imageAnalysisTasks[0].Result))
            imageAnalysis = imageAnalysisTasks[0].Result;

        if (!IsEmptyOrErrorAnalysis(imageAnalysisTasks[1].Result))
            imageAnalysis = imageAnalysisTasks[1].Result;

        return !IsEmptyOrErrorAnalysis(imageAnalysis)
            ? JsonConvert.DeserializeObject<List<Person>>(imageAnalysis)
            : new List<Person>();
    }

但是这个函数是这样调用的:

  if (alexa.IsLaunchRequest(alexaRequest))
   {
    //We don't want to wait for these two tasks to return
    Task.Run(() => SendSecurityImageToMagicMirrorUi());
    Task.Run(() => alexa.PostDirectiveResponseAsync(alexaRequest));

     //On your marks get set go! 8 seconds and counting
            return await Task.WhenAny(new[]
           {

               Task.Run(() => GetAlexaCognitiveResponseAsync()),
               Task.Run(() => AlexaRequestTimeoutMonitor())

      }).Result;
     }

最后是 Timeout 函数,如果 8 秒结束,它将返回:

 private static async Task<object> AlexaRequestTimeoutMonitor()
    {
        await Task.Delay(new TimeSpan(0, 0, 0, 8));

        return AlexaApi.ResponseBuilder(CreateNoPersonDetectionPhrase(new AlexaSynthesisResponseLibrary()), false);
    }

它在“CreateNoPersonDetectedPhrase”函数中,我将“IsFound”布尔标志转回“false”

标签: c#asynchronousparallel-processingasync-awaittask

解决方案


Parallel.ForEach启动异步委托,实际上它们是并行执行的,直到作为承诺await AnalyzeImageAsync返回的点。Task现在有了这个承诺,Parallel.ForEach“认为”这个任务已经完成,而实际上异步操作很可能才刚刚开始。但是没有人在等待,所以Parallel.ForEach很快就完成了。因此,您需要在Parallel.ForEach循环之外公开这些承诺,以便可以等待它们Task.WhenAll

但我也建议考虑一下你是否需要这种并行化,因为如果大部分AnalyzeImageAsync操作(这里没有介绍)不是 CPU 绑定的,那么只使用异步更合理。

等待异步任务的最简单方法(我不是说最好)Parallels.Forxxx

    public static async Task SomeAsyncTask()
    {
        await Task.Delay(5000);
        Console.WriteLine("2222");
    }

    public static async Task Loop()
    {
        var collection = new[] { 1, 2, 3, 4 };
        var asyncTasks = new Task[4];

        Parallel.ForEach(collection,
            (item, loop, index) =>
            {
                Console.WriteLine("1111");
                asyncTasks[index] = SomeAsyncTask();
            });

        await Task.WhenAll(asyncTasks);
    }

请注意,之后的所有内容SomeAsyncTask()都应移至任务的继续(在本例中为Console.WriteLine("2222"))。


推荐阅读