首页 > 解决方案 > TPL 数据流管道完成未从等待返回

问题描述

即使所有数据都已处理并显示在控制台上,我的管道也没有完成注册。我将它设置为等待完成,但它永远不会完成并且不允许该方法返回。

    TransformBlock<string, CompanyInfo> GetCompanyInfo;
    TransformBlock<string, List<Dividend>> GetDividendReports;
    TransformBlock<string, KeyStats> GetKeyStatInfo;
    TransformBlock<string, List<Interval>> GetIntervalReports;
    TransformBlock<List<Interval>, List<decimal>> GetChangesOverInterval;
    BroadcastBlock<string> broadcastSymbol;
    TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string> GenerateXmlString;
    ActionBlock<string> GenerateCompleteReport;
    CancellationTokenSource cancellationTokenSource;


    public Task StartPipeline()
    {
        cancellationTokenSource = new CancellationTokenSource();

        ExecutionDataflowBlockOptions executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationTokenSource.Token,
            MaxDegreeOfParallelism = MAXPARA
        };

        broadcastSymbol = new BroadcastBlock<string>(symbol => symbol);
        var joinblock = new JoinBlock<List<decimal>, List<Dividend>, KeyStats>(new GroupingDataflowBlockOptions { Greedy = false });

        GetCompanyInfo = new TransformBlock<string, CompanyInfo>(symbol =>
        {
            return RetrieveCompanyInfo(symbol);
        }, executionDataflowBlockOptions);

        GetDividendReports = new TransformBlock<string, List<Dividend>>(symbol =>
        {
            return RetrieveDividendInfo(symbol);
        }, executionDataflowBlockOptions);

        GetKeyStatInfo = new TransformBlock<string, KeyStats>(symbol =>
        {
            return RetrieveKeyStats(symbol);
        }, executionDataflowBlockOptions);

        GetIntervalReports = new TransformBlock<string, List<Interval>>(symbol =>
        {
            return RetrieveIntervals(symbol, 30);
        }, executionDataflowBlockOptions);

        GetChangesOverInterval = new TransformBlock<List<Interval>, List<decimal>>(intervals =>
        {
            return ConstructIntervalReport(intervals);
        }, executionDataflowBlockOptions);

        GenerateXmlString = new TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string>(tup =>
        {
            var ReportObj = new Report
            {
                changeIntervals = tup.Item1,
                dividends = tup.Item2,
                keyStats = tup.Item3
            };

            XmlSerializer ser = new XmlSerializer(typeof(Report));
            var stringWriter = new StringWriter();
            ser.Serialize(stringWriter, ReportObj);
            return stringWriter.ToString();

        }, executionDataflowBlockOptions);

        GenerateCompleteReport = new ActionBlock<string>(xml =>
        {
            var str = Path.GetRandomFileName().Replace(".", "") + ".xml";
            File.WriteAllText(str, xml);
            Console.WriteLine("Finished File");
        }, executionDataflowBlockOptions);

        var options = new DataflowLinkOptions { PropagateCompletion = true };

        var buffer = new BufferBlock<string>();
        buffer.LinkTo(broadcastSymbol);

        //Broadcasts the symbol
        broadcastSymbol.LinkTo(GetIntervalReports, options);
        broadcastSymbol.LinkTo(GetDividendReports, options);
        broadcastSymbol.LinkTo(GetKeyStatInfo, options);
        //Second teir parallel 
        GetIntervalReports.LinkTo(GetChangesOverInterval, options);
        //Joins the parallel blocks back together
        GetDividendReports.LinkTo(joinblock.Target2, options);
        GetKeyStatInfo.LinkTo(joinblock.Target3, options);
        GetChangesOverInterval.LinkTo(joinblock.Target1, options);

        joinblock.LinkTo(GenerateXmlString, options);
        GenerateXmlString.LinkTo(GenerateCompleteReport, options);

        buffer.Post("F");
        buffer.Post("AGFS");
        buffer.Post("BAC");
        buffer.Post("FCF");

        buffer.Complete();

        GenerateCompleteReport.Completion.Wait(cancellationTokenSource.Token);

    }

我不确定为什么它没有从管道返回异常或完成。当程序运行时,它会显示所有正在创建并停止的文件,但在等待完成后不会执行任何代码。PropagateCompletion 不应该允许块知道它们何时完成了它们的动作或转换吗?

标签: c#task-parallel-librarytpl-dataflow

解决方案


您没有将链接选项传递给您的BufferBlock所以完成不会被传播。另一方面,只有一个链接块将从您的BroadcastBlock. 如果您想等待所有三个链接块,则必须自己明确处理。有关示例,请参见此处

此外,由于该方法已经返回 aTask它不需要返回 a Task.CompletedTask,您可以简单地使用async/sawait而不是阻塞 with .Wait()。您希望调用者使用await此方法做null什么?

if (GenerateCompleteReport.Completion.IsCompletedSuccessfully) { return Task.CompletedTask;  }

        return null;

相反,您可以:

await enerateCompleteReport.Completion

推荐阅读