c# - 阻止 TPL 数据流处理
问题描述
我订阅了实时数据馈送,并根据接收到的数据维护状态。通常,所有数据都是按顺序接收的,但在删除消息的情况下,我会缓冲消息,通过 REST API 接收状态快照,然后回放缓冲区,跳过前面有 Id 的任何消息快照中指定的一个。目前,我正在执行以下操作:
class StateManager
{
private long _lastId;
private bool _isSyncing;
private object _syncLock;
private Dictionary<decimal,decimal> _state;
private ConcurrentQueue<SocketMessage> _messageBuffer;
private ManualResetEvent _messageEvent;
private ManualResetEvent _processingEvent;
public StateManager( DataSocket socket )
{
_isSyncing = false;
_syncLock = new object();
_state = new Dictionary<decimal,decimal>();
_messageBuffer = new ConcurrentQueue<SocketMessage>();
socket.OnMessage += OnSocketMessage;
Task.Factory.StartNew( MessageProcessingThread, TaskCreationOptions.LongRunning );
}
public void ApplySnapshot( Snapshot snapshot )
{
lock( _syncLock )
{
if( _isSyncing ) return;
_isSyncing = true;
_processingEvent.Reset();
}
// Apply the snapshot to the state...
_isSyncing = false;
_processingEvent.Set();
}
private void OnSocketMessage( object sender, SocketMessage msg )
{
_messageBuffer.Enqueue( msg );
_messageEvent.Set();
}
private async Task MessageProcessingThread()
{
while(true)
{
_messageEvent.WaitOne();
while(true)
{
_processingEvent.WaitOne();
if( !_messageBuffer.TryDequeue( out var msg ) )
{
_messageEvent.Reset();
break;
}
ApplyToState( msg );
}
}
}
}
这很好用,但我觉得它有点草率,并且在重负载下可以表现得更好。因此,我正在考虑过渡到Microsoft.Tpl.Dataflow
,它将为我处理排队和处理执行。但是,我之前使用过Dataflow,我有一个担心:
有没有一种方法可以暂停执行,ActionBlock
以便缓冲新任务但在我恢复之前不处理它们?在我检测到丢失消息的情况下,我需要能够暂停处理直到应用新的快照,然后恢复并处理所有缓冲的消息。
我可以在_processingEvent
里面使用ActionBlock
,但我觉得这会导致一堆问题。首先,它会阻塞任务,导致更多任务被启动,而这些任务会阻塞,迅速填满 TPL 的内部任务队列。此外,它会导致所有被阻止的任务同时完成,可能会出现乱序,从而导致另一个重新同步事件发生。
如果 TPL 无法做到这一点,有没有更好的方法来解决这个问题?
解决方案
推荐阅读
- reactjs - 使用 redux 工具包反应 redux 状态管理
- unity3d - Unity MRTK:地形交互
- jupyter-notebook - 是否可以选择在 qgrid 小部件中过滤的所有行?
- excel - IFERROR 添加到公式
- office-js - 带有 Angular CLI 8.2 的 Office Word 插件 - 出现未处理的导航错误
- flutter - 'showSnackBar' 和 'flatbutton' 已弃用,不应使用
- html - 引导背景图像更改为视频
- java - JPA Repository findAll where creationDate 超过设定时间
- kubernetes - Pod 的命令和参数什么时候执行
- swift - 如何确定 PKDrawing 是否为空白?