首页 > 解决方案 > 如何在转换过程中实现具有嵌入式死区时间的 TransformBlock?

问题描述

背景:我正在尝试解决将重试策略嵌入工作流的问题,但我对目前找到的解决方案并不是特别满意。所以我在这里发布这个问题的一个通用版本,如果存在的话,要求一个更好的解决方案。

这是一个普遍的问题:我有一个TransformBlock对其接收的项目应用冗长的转换。这种转换至少包括两个 I/O 操作,它们之间的社交距离应该是一个特定的最小时间延迟。在此延迟期间,TransformBlock应允许处理其他项目。换句话说,这种延迟不应计入MaxDegreeOfParallelism块的限制。这个时间跨度可以被认为是处理项目的“死时间”,但不应该是整个块的死时间。这是我要实现的目标的示例:

TransformBlock<int, string> block = CreateDeadTimeTransformBlock<int, string>(
    async (item, deadTime) =>
{
    Console.WriteLine($"Processing #{item}/A Started");
    await Task.Delay(5000); // Simulate some I/O operation
    Console.WriteLine($"Processing #{item}/A Finished");

    await deadTime.MinimumDelayAsync(TimeSpan.FromSeconds(10));
    //await Task.Delay(TimeSpan.FromSeconds(10)); // Undesirable

    Console.WriteLine($"Processing #{item}/B Started");
    await Task.Delay(5000); // Simulate some I/O operation
    Console.WriteLine($"Processing #{item}/B Finished");
    return item.ToString();
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2
});

在上面的示例中,deadTime对象与已处理的项目一起传递,客户端代码可以调用其MinimumDelayAsync方法来激活死时间间隔。所以在前两个项目完成了它们的“A”阶段并进入了他们的死区时间之后,第3和第4个项目的处理应该能够立即开始,而不必等待“B”的完成第 1 项和第 2 项的阶段。

到目前为止,我最好的尝试是引入一个SemaphoreSlim用于控制并发级别,并通过使其不受限制来禁用它的MaxDegreeOfParallelism功能。TransformBlock这产生了项目以随机顺序获取信号量的问题,而不是它们发布到块的顺序。我注意到这个问题可以通过配置具有TaskScheduler有限并发的块来解决,等于指定的MaxDegreeOfParallelism. 我不是 100% 确定这是否是解决该问题的可靠解决方案,我也不对发布到块的所有项目都作为昂贵的延续而最终进入信号量队列而不是经济地留在输入中这一事实感到兴奋块的队列。

这种方法的另一个问题是它优先处理新项目的“A”阶段,而不是已经在进行中的项目的“B”阶段。可以通过配置BoundedCapacity选项来部分控制此行为,方法是设置可以同时进行的项目总数的限制。

该解决方案的积极方面是它保留了 的全部功能TransformBlock,尤其是它的EnsureOrderedBoundedCapacity选项。

我错过了更好的解决方案吗?

这是我目前实现该CreateDeadTimeTransformBlock方法的最佳尝试:

public class DeadTime
{
    private readonly SemaphoreSlim _semaphore;
    private readonly CancellationToken _cancellationToken;

    public DeadTime(SemaphoreSlim semaphore, CancellationToken cancellationToken)
    {
        _semaphore = semaphore;
        _cancellationToken = cancellationToken;
    }

    public async Task MinimumDelayAsync(TimeSpan timeSpan)
    {
        _semaphore.Release();
        try
        {
            await Task.Delay(timeSpan, _cancellationToken);
        }
        finally
        {
            await _semaphore.WaitAsync();
        }
    }
}

public static TransformBlock<TInput, TOutput>
    CreateDeadTimeTransformBlock<TInput, TOutput>(
    Func<TInput, DeadTime, Task<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    var cancellationToken = dataflowBlockOptions.CancellationToken;
    var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
    var taskScheduler = dataflowBlockOptions.TaskScheduler;

    SemaphoreSlim semaphore;
    if (maxDOP == DataflowBlockOptions.Unbounded)
    {
        // This is not an interesting case
        semaphore = new SemaphoreSlim(Int32.MaxValue);
    }
    else
    {
        semaphore = new SemaphoreSlim(maxDOP, maxDOP);

        // The degree of parallelism is controlled by the semaphore
        dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

        // Use a limited-concurrency scheduler for preserving the processing order
        dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
            taskScheduler, maxDOP).ConcurrentScheduler;
    }

    var deadTime = new DeadTime(semaphore, cancellationToken);

    var block = new TransformBlock<TInput, TOutput>(async item =>
    {
        await semaphore.WaitAsync(cancellationToken);
        try
        {
            return await transform(item, deadTime);
        }
        finally
        {
            semaphore.Release();
        }
    }, dataflowBlockOptions);

    dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value
    return block;
}

小提琴完整的例子。

标签: c#concurrencytpl-dataflow

解决方案


推荐阅读