首页 > 解决方案 > 阻止 BroadcastBlock 在 LinkTo 上发送缓冲消息

问题描述

给定BroadcastBlock缓冲区中有一条消息,是否可以阻止该消息发送到新链接的目标?例如:

static void Main(string[] args)
{
    var myBroadcastBlock = new BroadcastBlock<string>(msg => msg);
    var myActionBlock = new ActionBlock<string>(msg => Console.WriteLine(msg));

    myBroadcastBlock.Post("Hello World!"); // No linked targets here.

    myBroadcastBlock.LinkTo(myActionBlock); // Link a target.

    // etc.
}

此代码将打印“Hello World”。基本上,尽管在建立链接之前已经发布了消息,但BroadcastBlock仍然会将缓冲的消息发送到ActionBlockon 。.LinkTo

有没有内置的方法来防止这种行为?我只想将消息发送到当前链接,而不是将来的链接。

我正在使用System.Threading.Tasks.Dataflow 4.11.1

标签: c#task-parallel-librarytpl-dataflow

解决方案


使用内置BroadcastBlock类无法实现此行为。它的行为是不可配置的。如果你迫切需要这种行为,你可以尝试下面的实现。它使用一个内部BroadcastBlock<(T, long)>索引,该索引随每条新消息而增加,以便在链接期间可以过滤掉当前活动的消息。

BroadcastBlockNewOnly由于需要从Tto(T, long)和 back to转换,因此类内部有相当多的间接性T。这使得类难以维护,效率也不高。在收到的每条消息上都会分配一个新对象,为垃圾收集器创建更多工作,因此请谨慎使用此类。

public class BroadcastBlockNewOnly<T> : ITargetBlock<T>, ISourceBlock<T>
{
    private readonly IPropagatorBlock<(T, long), (T, long)> _broadcastBlock;
    private long _index;

    public BroadcastBlockNewOnly(Func<T, T> cloningFunction,
        DataflowBlockOptions dataflowBlockOptions = null)
    {
        if (cloningFunction == null)
            throw new ArgumentNullException(nameof(cloningFunction));
        _broadcastBlock = new BroadcastBlock<(T, long)>(entry =>
        {
            var (value, index) = entry;
            return (cloningFunction(value), index);
        }, dataflowBlockOptions ?? new DataflowBlockOptions());
    }

    public Task Completion => _broadcastBlock.Completion;
    public void Complete() => _broadcastBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _broadcastBlock.Fault(ex);

    public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
    {
        if (target == null) throw new ArgumentNullException(nameof(target));
        var currentIndex = Interlocked.CompareExchange(ref _index, 0, 0);
        var linkedTargetProxy = new LinkedTargetProxy(target, this, currentIndex);
        return _broadcastBlock.LinkTo(linkedTargetProxy, linkOptions);
    }

    private long GetNewIndex() => Interlocked.Increment(ref _index);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
        T value, ISourceBlock<T> source, bool consumeToAccept)
    {
        var sourceProxy = source != null ?
            new SourceProxy(source, this, GetNewIndex) : null;
        return _broadcastBlock.OfferMessage(header, (value, GetNewIndex()),
            sourceProxy, consumeToAccept);
    }

    T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader header,
        ITargetBlock<T> target, out bool messageConsumed)
    {
        var targetProxy = target != null ? new TargetProxy(target, this) : null;
        var (value, index) = _broadcastBlock.ConsumeMessage(header, targetProxy,
            out messageConsumed);
        return value;
    }

    bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader header,
        ITargetBlock<T> target)
    {
        var targetProxy = target != null ? new TargetProxy(target, this) : null;
        return _broadcastBlock.ReserveMessage(header, targetProxy);
    }

    void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader header,
        ITargetBlock<T> target)
    {
        var targetProxy = target != null ? new TargetProxy(target, this) : null;
        _broadcastBlock.ReleaseReservation(header, targetProxy);
    }

    private class LinkedTargetProxy : ITargetBlock<(T, long)>
    {
        private readonly ITargetBlock<T> _realTarget;
        private readonly ISourceBlock<T> _realSource;
        private readonly long _indexLimit;

        public LinkedTargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource,
            long indexLimit)
        {
            _realTarget = realTarget;
            _realSource = realSource;
            _indexLimit = indexLimit;
        }

        DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
            DataflowMessageHeader header, (T, long) messageValue,
            ISourceBlock<(T, long)> source, bool consumeToAccept)
        {
            var (value, index) = messageValue;
            if (index <= _indexLimit) return DataflowMessageStatus.Declined;
            return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => _realTarget.Complete();
        void IDataflowBlock.Fault(Exception ex) => _realTarget.Fault(ex);
    }

    private class SourceProxy : ISourceBlock<(T, long)>
    {
        private readonly ISourceBlock<T> _realSource;
        private readonly ITargetBlock<T> _realTarget;
        private readonly Func<long> _getNewIndex;

        public SourceProxy(ISourceBlock<T> realSource, ITargetBlock<T> realTarget,
            Func<long> getNewIndex)
        {
            _realSource = realSource;
            _realTarget = realTarget;
            _getNewIndex = getNewIndex;
        }

        (T, long) ISourceBlock<(T, long)>.ConsumeMessage(DataflowMessageHeader header,
            ITargetBlock<(T, long)> target, out bool messageConsumed)
        {
            var value = _realSource.ConsumeMessage(header, _realTarget,
                out messageConsumed);
            var newIndex = _getNewIndex();
            return (value, newIndex);
        }

        bool ISourceBlock<(T, long)>.ReserveMessage(DataflowMessageHeader header,
            ITargetBlock<(T, long)> target)
        {
            return _realSource.ReserveMessage(header, _realTarget);
        }

        void ISourceBlock<(T, long)>.ReleaseReservation(DataflowMessageHeader header,
            ITargetBlock<(T, long)> target)
        {
            _realSource.ReleaseReservation(header, _realTarget);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => throw new NotSupportedException();
        void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
        IDisposable ISourceBlock<(T, long)>.LinkTo(ITargetBlock<(T, long)> target,
            DataflowLinkOptions linkOptions) => throw new NotSupportedException();
    }

    private class TargetProxy : ITargetBlock<(T, long)>
    {
        private readonly ITargetBlock<T> _realTarget;
        private readonly ISourceBlock<T> _realSource;

        public TargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource)
        {
            _realTarget = realTarget;
            _realSource = realSource;
        }

        DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
            DataflowMessageHeader header, (T, long) messageValue,
            ISourceBlock<(T, long)> source, bool consumeToAccept)
        {
            var (value, index) = messageValue;
            return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => throw new NotSupportedException();
        void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
    }

}

推荐阅读