c# - 如何在转换过程中实现具有嵌入式死区时间的 TransformBlock?
问题描述
背景:我正在尝试解决将重试策略嵌入工作流的问题,但我对目前找到的解决方案并不是特别满意。所以我在这里发布这个问题的一个通用版本,如果存在的话,要求一个更好的解决方案。
这是一个普遍的问题:我有一个TransformBlock
对其接收的项目应用冗长的转换。这种转换至少包括两个 I/O 操作,它们之间的社交距离应该是一个特定的最小时间延迟。在此延迟期间,TransformBlock
应允许处理其他项目。换句话说,这种延迟不应计入MaxDegreeOfParallelism
块的限制。这个时间跨度可以被认为是处理项目的“死时间”,但不应该是整个块的死时间。这是我要实现的目标的示例:
TransformBlock<int, string> block = CreateDeadTimeTransformBlock<int, string>(
async (item, deadTime) =>
{
Console.WriteLine($"Processing #{item}/A Started");
await Task.Delay(5000); // Simulate some I/O operation
Console.WriteLine($"Processing #{item}/A Finished");
await deadTime.MinimumDelayAsync(TimeSpan.FromSeconds(10));
//await Task.Delay(TimeSpan.FromSeconds(10)); // Undesirable
Console.WriteLine($"Processing #{item}/B Started");
await Task.Delay(5000); // Simulate some I/O operation
Console.WriteLine($"Processing #{item}/B Finished");
return item.ToString();
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2
});
在上面的示例中,deadTime
对象与已处理的项目一起传递,客户端代码可以调用其MinimumDelayAsync
方法来激活死时间间隔。所以在前两个项目完成了它们的“A”阶段并进入了他们的死区时间之后,第3和第4个项目的处理应该能够立即开始,而不必等待“B”的完成第 1 项和第 2 项的阶段。
到目前为止,我最好的尝试是引入一个SemaphoreSlim
用于控制并发级别,并通过使其不受限制来禁用它的MaxDegreeOfParallelism
功能。TransformBlock
这产生了项目以随机顺序获取信号量的问题,而不是它们发布到块的顺序。我注意到这个问题可以通过配置具有TaskScheduler
有限并发的块来解决,等于指定的MaxDegreeOfParallelism
. 我不是 100% 确定这是否是解决该问题的可靠解决方案,我也不对发布到块的所有项目都作为昂贵的延续而最终进入信号量队列而不是经济地留在输入中这一事实感到兴奋块的队列。
这种方法的另一个问题是它优先处理新项目的“A”阶段,而不是已经在进行中的项目的“B”阶段。可以通过配置BoundedCapacity
选项来部分控制此行为,方法是设置可以同时进行的项目总数的限制。
该解决方案的积极方面是它保留了 的全部功能TransformBlock
,尤其是它的EnsureOrdered
和BoundedCapacity
选项。
我错过了更好的解决方案吗?
这是我目前实现该CreateDeadTimeTransformBlock
方法的最佳尝试:
public class DeadTime
{
private readonly SemaphoreSlim _semaphore;
private readonly CancellationToken _cancellationToken;
public DeadTime(SemaphoreSlim semaphore, CancellationToken cancellationToken)
{
_semaphore = semaphore;
_cancellationToken = cancellationToken;
}
public async Task MinimumDelayAsync(TimeSpan timeSpan)
{
_semaphore.Release();
try
{
await Task.Delay(timeSpan, _cancellationToken);
}
finally
{
await _semaphore.WaitAsync();
}
}
}
public static TransformBlock<TInput, TOutput>
CreateDeadTimeTransformBlock<TInput, TOutput>(
Func<TInput, DeadTime, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
var cancellationToken = dataflowBlockOptions.CancellationToken;
var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
var taskScheduler = dataflowBlockOptions.TaskScheduler;
SemaphoreSlim semaphore;
if (maxDOP == DataflowBlockOptions.Unbounded)
{
// This is not an interesting case
semaphore = new SemaphoreSlim(Int32.MaxValue);
}
else
{
semaphore = new SemaphoreSlim(maxDOP, maxDOP);
// The degree of parallelism is controlled by the semaphore
dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;
// Use a limited-concurrency scheduler for preserving the processing order
dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
taskScheduler, maxDOP).ConcurrentScheduler;
}
var deadTime = new DeadTime(semaphore, cancellationToken);
var block = new TransformBlock<TInput, TOutput>(async item =>
{
await semaphore.WaitAsync(cancellationToken);
try
{
return await transform(item, deadTime);
}
finally
{
semaphore.Release();
}
}, dataflowBlockOptions);
dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value
return block;
}
小提琴完整的例子。
解决方案
推荐阅读
- django - Django Saleor 在 Windows 上安装 Dashboard 2.0
- c++ - LAPACKE 函数中对角化所需的完整矩阵或三角部分?
- java - 为什么从 Lock 创建条件,而不是使用“new”运算符?
- json - 如何解决 Odoo 控制器中的 SSL 错误分配键值
- sql - 在 Excel 公式中引用日期单元格时保持单元格格式
- javascript - 如何将包含符号的数组转换为字符串?
- python - Anaconda env 正在使用另一个 env 的站点包
- python - 关于 python argparse
- ios - JSC:致命:尝试释放访问权限,但 mutator 没有访问权限
- java - 运行 Eclipse 应用程序时的 log4j 警告