c# - 阻止 BroadcastBlock 在 LinkTo 上发送缓冲消息
问题描述
给定BroadcastBlock
缓冲区中有一条消息,是否可以阻止该消息发送到新链接的目标?例如:
static void Main(string[] args)
{
var myBroadcastBlock = new BroadcastBlock<string>(msg => msg);
var myActionBlock = new ActionBlock<string>(msg => Console.WriteLine(msg));
myBroadcastBlock.Post("Hello World!"); // No linked targets here.
myBroadcastBlock.LinkTo(myActionBlock); // Link a target.
// etc.
}
此代码将打印“Hello World”。基本上,尽管在建立链接之前已经发布了消息,但BroadcastBlock
仍然会将缓冲的消息发送到ActionBlock
on 。.LinkTo
有没有内置的方法来防止这种行为?我只想将消息发送到当前链接,而不是将来的链接。
解决方案
使用内置BroadcastBlock
类无法实现此行为。它的行为是不可配置的。如果你迫切需要这种行为,你可以尝试下面的实现。它使用一个内部BroadcastBlock<(T, long)>
索引,该索引随每条新消息而增加,以便在链接期间可以过滤掉当前活动的消息。
BroadcastBlockNewOnly
由于需要从T
to(T, long)
和 back to转换,因此类内部有相当多的间接性T
。这使得类难以维护,效率也不高。在收到的每条消息上都会分配一个新对象,为垃圾收集器创建更多工作,因此请谨慎使用此类。
public class BroadcastBlockNewOnly<T> : ITargetBlock<T>, ISourceBlock<T>
{
private readonly IPropagatorBlock<(T, long), (T, long)> _broadcastBlock;
private long _index;
public BroadcastBlockNewOnly(Func<T, T> cloningFunction,
DataflowBlockOptions dataflowBlockOptions = null)
{
if (cloningFunction == null)
throw new ArgumentNullException(nameof(cloningFunction));
_broadcastBlock = new BroadcastBlock<(T, long)>(entry =>
{
var (value, index) = entry;
return (cloningFunction(value), index);
}, dataflowBlockOptions ?? new DataflowBlockOptions());
}
public Task Completion => _broadcastBlock.Completion;
public void Complete() => _broadcastBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _broadcastBlock.Fault(ex);
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
if (target == null) throw new ArgumentNullException(nameof(target));
var currentIndex = Interlocked.CompareExchange(ref _index, 0, 0);
var linkedTargetProxy = new LinkedTargetProxy(target, this, currentIndex);
return _broadcastBlock.LinkTo(linkedTargetProxy, linkOptions);
}
private long GetNewIndex() => Interlocked.Increment(ref _index);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
T value, ISourceBlock<T> source, bool consumeToAccept)
{
var sourceProxy = source != null ?
new SourceProxy(source, this, GetNewIndex) : null;
return _broadcastBlock.OfferMessage(header, (value, GetNewIndex()),
sourceProxy, consumeToAccept);
}
T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<T> target, out bool messageConsumed)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
var (value, index) = _broadcastBlock.ConsumeMessage(header, targetProxy,
out messageConsumed);
return value;
}
bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
return _broadcastBlock.ReserveMessage(header, targetProxy);
}
void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
_broadcastBlock.ReleaseReservation(header, targetProxy);
}
private class LinkedTargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;
private readonly long _indexLimit;
public LinkedTargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource,
long indexLimit)
{
_realTarget = realTarget;
_realSource = realSource;
_indexLimit = indexLimit;
}
DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
if (index <= _indexLimit) return DataflowMessageStatus.Declined;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => _realTarget.Complete();
void IDataflowBlock.Fault(Exception ex) => _realTarget.Fault(ex);
}
private class SourceProxy : ISourceBlock<(T, long)>
{
private readonly ISourceBlock<T> _realSource;
private readonly ITargetBlock<T> _realTarget;
private readonly Func<long> _getNewIndex;
public SourceProxy(ISourceBlock<T> realSource, ITargetBlock<T> realTarget,
Func<long> getNewIndex)
{
_realSource = realSource;
_realTarget = realTarget;
_getNewIndex = getNewIndex;
}
(T, long) ISourceBlock<(T, long)>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target, out bool messageConsumed)
{
var value = _realSource.ConsumeMessage(header, _realTarget,
out messageConsumed);
var newIndex = _getNewIndex();
return (value, newIndex);
}
bool ISourceBlock<(T, long)>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
return _realSource.ReserveMessage(header, _realTarget);
}
void ISourceBlock<(T, long)>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
_realSource.ReleaseReservation(header, _realTarget);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
IDisposable ISourceBlock<(T, long)>.LinkTo(ITargetBlock<(T, long)> target,
DataflowLinkOptions linkOptions) => throw new NotSupportedException();
}
private class TargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;
public TargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource)
{
_realTarget = realTarget;
_realSource = realSource;
}
DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
}
}
推荐阅读
- javascript - 使用 ng2 smart tbl 显示从 API 获取的数据
- django - 使用管理员权限,如何在没有密码的情况下验证或登录用户帐户?
- java - EJB 使用 POJO 作为远程
- angular - Angular 7不会路由到新页面,但组件只会显示在前一个组件下
- ionic-framework - Ionic 4 - 离子修复后构建 apk 大小变为原始大小的一半
- python - 我如何补偿假期 - 熊猫
- r - 如何从 r 中打开的 .txt 文件中选择链接(行)?
- android - Firebase身份验证不成功android
- android - 无法将 1.13 mb apk 上传到 google play 商店,它需要 aab 文件
- excel - 根据条件着色单元格(单元格值包含小数百分比)