首页 > 解决方案 > 当 MaxDegreeOfParallelism > 1 时,Parallel.For 的 localInit 和 localFinally 等效用于 TPL 数据流块

问题描述

我有TransformBlock<int, int>一个MaxDegreeOfParallelism = 6。我还发现,Func<int, int>传递给块的构造函数(为每个发布的项目执行)可以在逻辑上分解为一个昂贵的初始化例程和一个改变函数局部变量的主体。如果我可以将函数重构为一个名为 的类TransformBlockState,每个并发操作执行一次初始化(就像Parallel.ForlocalInit回调),然后允许 TPL 数据流确保状态永远不会被超过一个项目改变,那么效率会更高。时间。

重构前:

Func<int, int> original = x => {
    // method local variables
    // expensive initialization routine to setup locals
    // perform action on local variables
    // potentially expensive teardown
}

重构后:

public sealed class TransformBlockState<TIn, TOut> : IDisposable
{
    // instance state

    public TransformBlockState()
    {
        // expensive initialization routine
    }

    public TOut Transform(TIn value)
    {
        // called many times but never concurrently for the same instance
    }

    public void Dispose()
    {
        // tear down state
    }
}

TPL Dataflow 库中是否已经存在类似于localInit(for .ctor) 和localFinally(for ) 回调的东西?Dispose

我想避免有ConcurrentStack<TransformBlockState>(很多不必要的锁定)并且我想避免将 存储TransformBlockState在一个[ThreadStatic]字段中(因为不能保证Task不会在多个线程上运行(显然是顺序的)或Task在单个线程上运行多个 s (也许I/O 上的所有阻塞))。

标签: c#task-parallel-librarytpl-dataflow

解决方案


我想我有一个更好的例子——我需要从航空公司(实际上是 GDS)获得几千张机票记录。为此,我需要先建立一个昂贵的会话,然后才能发送 SOAP 或 REST 请求。会话受到限制,所以我真的不想为每张票创建一个新的。它使每个请求所需的时间加倍,浪费金钱和资源。

创建自定义块似乎是解决方案,但实际上并不是那么好。数据流建立处理消息流的处理块管道。试图让它们以不同的方式工作将与它们对数据流模型的基本假设发生冲突。

例如,Tasks 用于并行性、节流和负载平衡 - MaxMessagesPerTask选项会在收到最大数量的消息后终止任务,这样一个任务就不会长时间占用 CPU。每个任务创建和销毁会话会破坏该机制最终创建不必要的会话。

汇集

处理这个问题的一种方法是使用一个对象池,其中包含将由块使用的“昂贵”对象,在本例中为 Sessions。令人恼火的是,Microsoft.Extensions.ObjectPool包就提供了这样一个池。文档是不存在的,它们被欺骗性地放置在ASP.NET树中,但这是一个独立的 .NET Standard 2.0 包。Github源代码看似简单,并且该类使用 Interlocked.CompareExchange 来避免锁定。甚至还有一个 LeakTrackingObjectPool 实现。

如果我过去知道这一点,我可以写:

var pool = new DefaultObjectPool<Session>(new DefaultPooledObjectPolicy<Session>());

DefaultPooledObjectPolicy策略仅用于new创建新实例。不过,创建新策略很容易,例如使用自己的创建逻辑甚至工厂方法的策略:

public class SessionPolicy : DefaultPooledObjectPolicy<Session>
{
    public override Session Create()
    {
        //Do whatever is needed here
        return session;
    }
}

重定向

另一种选择是使用多个块实例并让源块链接到所有实例。为了避免将所有消息发送到第一个块,需要有界容量。假设我们有这个工厂方法:

TransformBlock<TIn,TOut> CreateThatBlockWithSession<TIn,TOut>(Settings someSettings)
{
    var session=CreateSomeSessionFrom(someSettings);
    var bounded=new DataflowBlockOptions {BoundedCapacity =1};
    return new TransformBlock<TIn,TOut>(msg=>FunctionThatUses(msg,session),bounded);
}

并使用它来创建多个块:

_blocks=Enumerable.Range(0,10)
                  .Select(_=>CreateThatBlockWithSession(settings))
                  .ToArray();

源块可以连接到所有这些块:

foreach(var target in _blocks)
{
    _source.LinkTo(target,options);
}

然后,将所有这些块链接到下一个块。这里棘手的部分是我们不能仅仅传播完成。如果其中一个块完成,它将强制下一个块完成,即使其他块中有消息等待。

解决方案是使用Task.WhenAllContinueWith推动完成到下一个块:

foreach(var target in _blocks)
{
    target.LinkTo(_nextBlock);
}

var allTasks=_blocks.Select(blk=>blk.Completion);
Task.WhenAll(allTasks)
    .ContinueWith(_=>_nextBlock.Complete());

更健壮的实现将检查IsFaulted所有任务的状态并Fault()在其中一个失败时调用下一个块


推荐阅读