首页 > 解决方案 > TPL Dataflow C# 等待所有链接块完成

问题描述

我正在使用 TPL 数据流来构建管道。此管道应在逻辑上执行以下操作:

  1. 首先处理多个数据项 - 假设它是pollingBlock.
  2. 如果满足某些条件,则将其中一项(已满足条件)传递给某个块以进行进一步监控,假设它是monitoringBlock. 每个monitoringBlock只能容纳 1 个项目,但有多个monitoringBlocks.
  3. pollingBlock应继续处理所有项目,包括以某种while (true)方式发布的项目。
  4. monitoringBlocks虽然占用不应该接受任何其他消息,并且这些消息应该被删除而无需进一步处理。
  5. monitoringBlock消息中的一些处理应该被标记为完成或转移到下一个块进行处理之后,这个下一个块是processingBlock

一个简短的示例:

public Task ExecutePipeline()
{
    var block = CreatePollingPipeline();
    block.Post((_serviceOne, _serviceTwo));

    block.Complete();
    return block.Completion;
}

public ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)> CreatePollingPipeline()
{
    var pollingAlertHolder = new BufferBlock<(string input1, string input2)>();

    var pollingBlock = new ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)>(services =>
    {
        while (true)
        {
            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP2", "INPVAL2"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP2", "INPVAL2"));
            Thread.Sleep(2000);
        }
    });

    var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
        {
            Console.WriteLine("monitoringBlock started");
            Thread.Sleep(5000);
            Console.WriteLine("monitoringBlock completed");

            return (inputs.input1, inputs.input2);
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });

    pollingAlertHolder.LinkTo(monitoringBlock, new DataflowLinkOptions() { PropagateCompletion = true },
        inputs => inputs.input1 == "INP1" && inputs.input2 == "INPVAL");
    pollingAlertHolder.LinkTo(DataflowBlock.NullTarget<(string input1, string input2)>());

    var processingBlock = new ActionBlock<(string input1, string input2)>(i =>
    {
        Console.WriteLine("processingBlock started");
        Thread.Sleep(2000);
        Console.WriteLine("processingBlock completed");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
    monitoringBlock.LinkTo(processingBlock, new DataflowLinkOptions { PropagateCompletion = true });


    return pollingBlock;
}

我的问题是我如何保持monitoringBlock忙碌,直到链接processingBlock完成它的工作?我不希望monitoringBlock在消息完成完整处理周期之前发布任何项目。

标签: c#.net-coreasync-awaittask-parallel-librarytpl-dataflow

解决方案


正如评论中已经提到的,您可以简单地将monitoringBlockand的逻辑封装processingBlock在一个块中,例如,您可以通过预定义的Datablock.Encapsulate方法来实现。

但是,如果您不想这样做,您可以使用AutoResetEvent或类似的抽象,您的代码可能是这样的:

AutoResetEvent dataflowEvent = new AutoResetEvent(true);
var bufferBlock = new ActionBLock<(string input1, string input2)>(i =>
{
    dataflowEvent.WaitOne();
    monitoringBlock.Post(i);
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
    {
        Console.WriteLine("monitoringBlock started");
        Thread.Sleep(5000);
        Console.WriteLine("monitoringBlock completed");

        dataflowEvent.Set();
        return (inputs.input1, inputs.input2);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });

推荐阅读