c# - TPL 数据流管道完成未从等待返回
问题描述
即使所有数据都已处理并显示在控制台上,我的管道也没有完成注册。我将它设置为等待完成,但它永远不会完成并且不允许该方法返回。
TransformBlock<string, CompanyInfo> GetCompanyInfo;
TransformBlock<string, List<Dividend>> GetDividendReports;
TransformBlock<string, KeyStats> GetKeyStatInfo;
TransformBlock<string, List<Interval>> GetIntervalReports;
TransformBlock<List<Interval>, List<decimal>> GetChangesOverInterval;
BroadcastBlock<string> broadcastSymbol;
TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string> GenerateXmlString;
ActionBlock<string> GenerateCompleteReport;
CancellationTokenSource cancellationTokenSource;
public Task StartPipeline()
{
cancellationTokenSource = new CancellationTokenSource();
ExecutionDataflowBlockOptions executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationTokenSource.Token,
MaxDegreeOfParallelism = MAXPARA
};
broadcastSymbol = new BroadcastBlock<string>(symbol => symbol);
var joinblock = new JoinBlock<List<decimal>, List<Dividend>, KeyStats>(new GroupingDataflowBlockOptions { Greedy = false });
GetCompanyInfo = new TransformBlock<string, CompanyInfo>(symbol =>
{
return RetrieveCompanyInfo(symbol);
}, executionDataflowBlockOptions);
GetDividendReports = new TransformBlock<string, List<Dividend>>(symbol =>
{
return RetrieveDividendInfo(symbol);
}, executionDataflowBlockOptions);
GetKeyStatInfo = new TransformBlock<string, KeyStats>(symbol =>
{
return RetrieveKeyStats(symbol);
}, executionDataflowBlockOptions);
GetIntervalReports = new TransformBlock<string, List<Interval>>(symbol =>
{
return RetrieveIntervals(symbol, 30);
}, executionDataflowBlockOptions);
GetChangesOverInterval = new TransformBlock<List<Interval>, List<decimal>>(intervals =>
{
return ConstructIntervalReport(intervals);
}, executionDataflowBlockOptions);
GenerateXmlString = new TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string>(tup =>
{
var ReportObj = new Report
{
changeIntervals = tup.Item1,
dividends = tup.Item2,
keyStats = tup.Item3
};
XmlSerializer ser = new XmlSerializer(typeof(Report));
var stringWriter = new StringWriter();
ser.Serialize(stringWriter, ReportObj);
return stringWriter.ToString();
}, executionDataflowBlockOptions);
GenerateCompleteReport = new ActionBlock<string>(xml =>
{
var str = Path.GetRandomFileName().Replace(".", "") + ".xml";
File.WriteAllText(str, xml);
Console.WriteLine("Finished File");
}, executionDataflowBlockOptions);
var options = new DataflowLinkOptions { PropagateCompletion = true };
var buffer = new BufferBlock<string>();
buffer.LinkTo(broadcastSymbol);
//Broadcasts the symbol
broadcastSymbol.LinkTo(GetIntervalReports, options);
broadcastSymbol.LinkTo(GetDividendReports, options);
broadcastSymbol.LinkTo(GetKeyStatInfo, options);
//Second teir parallel
GetIntervalReports.LinkTo(GetChangesOverInterval, options);
//Joins the parallel blocks back together
GetDividendReports.LinkTo(joinblock.Target2, options);
GetKeyStatInfo.LinkTo(joinblock.Target3, options);
GetChangesOverInterval.LinkTo(joinblock.Target1, options);
joinblock.LinkTo(GenerateXmlString, options);
GenerateXmlString.LinkTo(GenerateCompleteReport, options);
buffer.Post("F");
buffer.Post("AGFS");
buffer.Post("BAC");
buffer.Post("FCF");
buffer.Complete();
GenerateCompleteReport.Completion.Wait(cancellationTokenSource.Token);
}
我不确定为什么它没有从管道返回异常或完成。当程序运行时,它会显示所有正在创建并停止的文件,但在等待完成后不会执行任何代码。PropagateCompletion 不应该允许块知道它们何时完成了它们的动作或转换吗?
解决方案
您没有将链接选项传递给您的BufferBlock
所以完成不会被传播。另一方面,只有一个链接块将从您的BroadcastBlock
. 如果您想等待所有三个链接块,则必须自己明确处理。有关示例,请参见此处
此外,由于该方法已经返回 aTask
它不需要返回 a Task.CompletedTask
,您可以简单地使用async/sawait
而不是阻塞 with .Wait()
。您希望调用者使用await
此方法做null
什么?
if (GenerateCompleteReport.Completion.IsCompletedSuccessfully) { return Task.CompletedTask; }
return null;
相反,您可以:
await enerateCompleteReport.Completion
推荐阅读
- jhipster - 登录后keycloak返回错误未找到活动密钥,
- apache - 无法建立连接,因为目标机器主动拒绝它 apache wampserver
- origen-sdk - 以编程方式访问 origen save 命令
- json - 在类似 JSON 的文档中为非数字原子值添加引号
- excel - 在 IF/CountIf 中跳过单元格
- ffmpeg - 将两个视频并排放置,提高性能
- c# - C# Selenium 函数仍在等待加载选项的位置
- php - 我从 CoinmarketCap API PHP 获取价格
- objective-c - 如何使用点符号属性访问为(不)配置错误/警告?
- stata - 用户可以决定是循环代码还是只运行一次?