c# - TPL Dataflow C# 等待所有链接块完成
问题描述
我正在使用 TPL 数据流来构建管道。此管道应在逻辑上执行以下操作:
- 首先处理多个数据项 - 假设它是
pollingBlock
. - 如果满足某些条件,则将其中一项(已满足条件)传递给某个块以进行进一步监控,假设它是
monitoringBlock
. 每个monitoringBlock
只能容纳 1 个项目,但有多个monitoringBlocks
. pollingBlock
应继续处理所有项目,包括以某种while (true)
方式发布的项目。monitoringBlocks
虽然占用不应该接受任何其他消息,并且这些消息应该被删除而无需进一步处理。- 在
monitoringBlock
消息中的一些处理应该被标记为完成或转移到下一个块进行处理之后,这个下一个块是processingBlock
一个简短的示例:
public Task ExecutePipeline()
{
var block = CreatePollingPipeline();
block.Post((_serviceOne, _serviceTwo));
block.Complete();
return block.Completion;
}
public ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)> CreatePollingPipeline()
{
var pollingAlertHolder = new BufferBlock<(string input1, string input2)>();
var pollingBlock = new ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)>(services =>
{
while (true)
{
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP2", "INPVAL2"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP2", "INPVAL2"));
Thread.Sleep(2000);
}
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
{
Console.WriteLine("monitoringBlock started");
Thread.Sleep(5000);
Console.WriteLine("monitoringBlock completed");
return (inputs.input1, inputs.input2);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
pollingAlertHolder.LinkTo(monitoringBlock, new DataflowLinkOptions() { PropagateCompletion = true },
inputs => inputs.input1 == "INP1" && inputs.input2 == "INPVAL");
pollingAlertHolder.LinkTo(DataflowBlock.NullTarget<(string input1, string input2)>());
var processingBlock = new ActionBlock<(string input1, string input2)>(i =>
{
Console.WriteLine("processingBlock started");
Thread.Sleep(2000);
Console.WriteLine("processingBlock completed");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
monitoringBlock.LinkTo(processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
return pollingBlock;
}
我的问题是我如何保持monitoringBlock
忙碌,直到链接processingBlock
完成它的工作?我不希望monitoringBlock
在消息完成完整处理周期之前发布任何项目。
解决方案
正如评论中已经提到的,您可以简单地将monitoringBlock
and的逻辑封装processingBlock
在一个块中,例如,您可以通过预定义的Datablock.Encapsulate
方法来实现。
但是,如果您不想这样做,您可以使用AutoResetEvent
或类似的抽象,您的代码可能是这样的:
AutoResetEvent dataflowEvent = new AutoResetEvent(true);
var bufferBlock = new ActionBLock<(string input1, string input2)>(i =>
{
dataflowEvent.WaitOne();
monitoringBlock.Post(i);
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
{
Console.WriteLine("monitoringBlock started");
Thread.Sleep(5000);
Console.WriteLine("monitoringBlock completed");
dataflowEvent.Set();
return (inputs.input1, inputs.input2);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
推荐阅读
- javascript - 如何一个接一个地多次运行柏树测试?
- performance - 交换内存中未对齐的 64 位值的字节的最快方法是什么?
- python - 如何在 Tensorflow Keras 中标准化我的图像数据
- node.js - 无法读取未定义的属性“名称”,而“名称”甚至不在代码中
- php - 阅读框架 laravel
- python - 将 pandas 列与列表中的值进行比较并返回更高的下限值
- javascript - Node.js MSSQL包查询功能不返回数据
- sql-server - dacpac 文件向 LocalDB 发布错误:“无法部署该元素。该元素包含无法在目标数据库中重新创建的状态。”
- python - Finding the frequency that each column is a row minimum
- python - 删除熊猫列中的特定单词