首页 > 解决方案 > 如何对连接的 TPL 数据流块进行错误处理?

问题描述

看来我不了解 TPL Dataflow 错误处理。

假设我有一个要处理的项目列表,并且为此使用了 ActionBlock:

var actionBlock = new ActionBlock<int[]>(async tasks =>
{
    foreach (var task in tasks)
    {
        await Task.Delay(1);

        if (task > 30)
        {
            throw new InvalidOperationException();
        }

        Console.WriteLine("{0} Completed", task);
    }
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 200,
    MaxDegreeOfParallelism = 4
});

for (i = 0; i < 10000; i++)
{
    if (!await bufferBlock.SendAsync(i))
    {
        break;
    }
}

actionBlock.Complete();

await actionBlock.Completion;

如果发生错误,块将转换为故障状态,并且 SendAsync(...) 返回 false。我可以停止我的循环并完成它,当我等待完成时会引发异常。到目前为止,一切都很好。

当我在两者之间放置一个 BufferBlock 时,它不再起作用:

bufferBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
    PropagateCompletion = true
});

for (i = 0; i < 10000; i++)
{
    if (!await bufferBlock.SendAsync(i, cts.Token))
    {
        break;
    }
}

bufferBlock.Complete();

await actionBlock.Completion;

对 SendAsync() 的调用只是永远“阻塞”,因为 BufferBlock 永远不会转换到故障状态。

我找到的唯一解决方案是:

using (var cts = new CancellationTokenSource())
{
    actionBlock.Completion.ContinueWith(x =>
    {
        if (x.Status != TaskStatus.RanToCompletion)
        {
            cts.Cancel();
        }
    });

    var i = 0;

    try
    {
        for (i = 0; i < 10000; i++)
        {
            if (cts.Token.IsCancellationRequested)
            {
                break;
            }

            if (!await bufferBlock.SendAsync(i, cts.Token))
            {
                break;
            }
        }
    }
    catch (OperationCanceledException)
    {
    }

    bufferBlock.Complete();

    await actionBlock.Completion;
}

因为状态传播,我必须听我网络中最后一个块的状态,当这个块停止时,我必须停止我的循环。

这是使用 Dataflow 库的预期方式还是有更好的解决方案?

标签: c#task-parallel-librarytpl-dataflow

解决方案


不允许未处理的异常。块中未处理的异常意味着块以及整个管道最终被破坏并且必须中止。这不是 TPL 数据流错误,而是整个数据流范例的工作方式。异常是为了向调用堆栈发出错误信号。但是,数据流中没有调用堆栈。

块是通过消息进行通信的独立工作者。链接块和故障块之间没有所有权关系并不意味着任何先前或后续块也应该中止。PropagateCompletion这就是为什么false默认情况下。

如果一个源链接到多个块,则消息可以轻松转到其他块。也可以在运行时更改块之间的链接。

在管道中有两种不同类型的错误:

  • 块/actor/worker 处理消息时发生的消息错误
  • 使管道无效并可能需要中止的管道错误

如果单个消息出现故障,则没有理由中止管道。

消息错误

如果在处理消息时出现问题,参与者应该对该消息执行一些操作并继续处理下一个消息。那可能是:

  • 记录错误并继续
  • 向另一个块发送“错误”消息
  • 在整个管道中使用一个Result<TMessage,TError>类而不是使用原始消息类型,并将任何错误添加到结果中

重试和恢复策略可以建立在此之上,例如将任何失败的消息转发到“重试”块或死消息块

最简单的方法是捕获异常并记录它们:

var block=new ActionBlock<int[]>(msg=>{
    try
    {
    ...
    }
    catch(Exception exc)
    {
        _logger.LogError(exc);
    }
});

另一种选择是手动发布到例如死信队列:

var dead=new BufferBlock<(int[] data,Exception error)>();

var block=new ActionBlock<int[]>(msg=>{
    try
    {
    ...
    }
    catch(Exception exc)
    {
        await _dead.SendAsync(msg,exc);
        _logger.LogError(exc);
    }
});

更进一步,可以定义一个Result<TMessage,TError>类来包装结果。下游块可以忽略错误结果。LinkTo谓词还可用于重新路由错误消息。我会作弊并将错误硬编码为Exception. 更好的实现将使用不同的类型来表示成功和错误:

record Result<TMessage>(TMessage? Message,Exception ? Error)
{
    public bool HasError=>error!=null;
}


var block1=new TransformBlock<Result<int[]>,Result<double>>(msg=>{
    if (msg.HasError)
    {
        //Propagate the error message
        return new Result<double>(default,msg.Error);
    }
    try
    {
       var sum=(double)msg.Message.Sum();
       if (sum % 5 ==0)
       {
           throw new Exception("Why not?");
       }
       return new Result(sum,null);
    }
    catch(Exception exc)
    {
        return new Result(null,exc);
        
    }
});

var block2=new ActionBlock<Result<double>>(...);

block1.LinkTo(block2);

另一种选择是将错误消息重定向到不同的块:


var errorBlock=new ActionBlock<Result<int[]>>(msg=>{
    _logger.LogError(msg.Error);
});

block1.LinkTo(errorBlock,msg=>msg.HasError);
block1.LinkTo(block2);

这会将所有错误消息重定向到错误块。所有其他消息继续block2

管道错误

在某些情况下,错误非常严重,以至于当前块无法恢复,甚至可能必须取消/中止整个管道。.NET 中的取消是通过 CancellationToken 处理的。所有块都接受 CancellationToken 以允许中止。

没有适用于所有管道的单一中止策略。向前传播取消是常见的,但绝对不是唯一的选择。

在最简单的情况下,

var pipeLineCancellation = new CancellationTokenSource();


var block1=new TransformBlock<Result<int[]>,Result<double>>(msg=>{
    ...
}, 
new ExecutionDataflowBlockOptions {
   CancellationToken=pipeLineCancellation.Token
});

如果出现严重错误,块异常处理程序可能会请求取消:

    //Wrong table name. We can't use the database
    catch(SqlException exc) when (exc.Number ==208)
    {
       ...
       pipeLineCancellation.Cancel();
    }

这将中止使用相同 CancellationTokenSource 的所有块。这并不意味着所有块都应该连接到同一个 CancellationTokenSource。

向后流动取消

在 Go 管道中,通常使用error向前一个块发送取消消息的通道。CancellationTokenSource在 C# 中使用linked s也可以做到这一点。甚至可以说这比 Go 还要好。

可以使用CreateLinkedTokenSource创建多个链接的 CancellationTokenSources 。通过创建向后链接的源,我们可以为其自己的源进行块信号消除,并使消除流到根。

var cts5=new CancellationTokenSource();
var cts4=CancellationTokenSource.CreateLinkedTokenSource(cts5.Token);
...
var cts1=CancellationTokenSource.CreateLinkedTokenSource(cts2.Token);

...
var block3=new TransformBlock<Result<int[]>,Result<double>>(msg=>{
    ...
    catch(SqlException) 
    {
        cts3.Cancel();
    }
}, 
new ExecutionDataflowBlockOptions {
   CancellationToken=cts3.Token
});

这将在不取消下游块的情况下,逐块地向后发出取消信号。

管道模式

.NET 中的数据流是鲜为人知的瑰宝,因此很难找到好的参考和模式。不过,Go 中的概念相似,因此可以使用Go 并发模式:管道和取消中的模式。

TPL 数据流实现了处理循环和完成传播,因此通常只需要提供处理消息的Actionor Func。其余的模式必须实现,尽管 .NET 提供了一些优于 Go 的优势。

  • done通道本质上是一个 CancellationTokenSource 。
  • 扇入、扇出已经通过现有块处理,或者可以使用克隆消息的相对简单的自定义块处理
  • CancellationTokenSources 可以显式链接。在 Go 中,每个“阶段”(本质上是一个块)都必须将完成/取消传播到其他阶段
  • 所有阶段/块都可以使用一个 CancellationTokenSource。
  • 链接不仅允许更轻松的组合,还允许对管道/网格进行运行时修改。

假设我们想在一段时间后停止处理消息,即使没有错误。所需要做的就是创建一个所有块都使用的 CTS:

var pipeLineCancellation = new CancellationTokenSource();


var block1=new TransformBlock<Result<int[]>,Result<double>>(msg=>{
    ...
}, 
new ExecutionDataflowBlockOptions {
   CancellationToken=pipeLineCancellation.Token
});
var block2 =.....;

pipeLineCancellation.Cancel();

也许我们只想让管道运行一分钟?轻松搭配

var pipeLineCancellation = new CancellationTokenSource(60000);

也有一些缺点,因为 Dataflow 块无法访问“通道”或控制循环

  • 在 Go 中,很容易将data,errordone通道传递到每个阶段,简化错误报告和完成。在 .NET 中,块委托可能必须直接访问其他块或 CTS。
  • 在 Go 中,使用公共状态来积累数据或管理会话/远程连接状态更容易。想象一下控制像 Selenium 这样的屏幕刮板的舞台/块。我们真的不想在每条消息上都重新启动浏览器。

或者我们可能想使用 SqlBulkCopy 将数据插入数据库。使用 ActionBlock,我们必须为每个批次创建一个新实例,这可能是也可能不是问题。


推荐阅读