首页 > 解决方案 > 阻止 TPL 数据流处理

问题描述

我订阅了实时数据馈送,并根据接收到的数据维护状态。通常,所有数据都是按顺序接收的,但在删除消息的情况下,我会缓冲消息,通过 REST API 接收状态快照,然后回放缓冲区,跳过前面有 Id 的任何消息快照中指定的一个。目前,我正在执行以下操作:

class StateManager
{
  private long _lastId;
  private bool _isSyncing;
  private object _syncLock;

  private Dictionary<decimal,decimal> _state;  
  private ConcurrentQueue<SocketMessage> _messageBuffer;

  private ManualResetEvent _messageEvent;
  private ManualResetEvent _processingEvent;

  public StateManager( DataSocket socket )
  {
    _isSyncing = false;
    _syncLock = new object();

    _state = new Dictionary<decimal,decimal>();
    _messageBuffer = new ConcurrentQueue<SocketMessage>();

    socket.OnMessage += OnSocketMessage;
    Task.Factory.StartNew( MessageProcessingThread, TaskCreationOptions.LongRunning );
  }

  public void ApplySnapshot( Snapshot snapshot )
  {
    lock( _syncLock )
    {
      if( _isSyncing ) return;

      _isSyncing = true;
      _processingEvent.Reset();
    }

    // Apply the snapshot to the state...

    _isSyncing = false;
    _processingEvent.Set();
  }

  private void OnSocketMessage( object sender, SocketMessage msg )
  {
    _messageBuffer.Enqueue( msg );
    _messageEvent.Set();
  }

  private async Task MessageProcessingThread()
  {
    while(true)
    {
      _messageEvent.WaitOne();
      while(true)
      {
        _processingEvent.WaitOne();
        if( !_messageBuffer.TryDequeue( out var msg ) )
        {
          _messageEvent.Reset();
          break;
        }
        ApplyToState( msg );
      }

    }
  }
}

这很好用,但我觉得它有点草率,并且在重负载下可以表现得更好。因此,我正在考虑过渡到Microsoft.Tpl.Dataflow,它将为我处理排队和处理执行。但是,我之前使用过Dataflow,我有一个担心:

有没有一种方法可以暂停执行,ActionBlock以便缓冲新任务但在我恢复之前不处理它们?在我检测到丢失消息的情况下,我需要能够暂停处理直到应用新的快照,然后恢复并处理所有缓冲的消息。

我可以在_processingEvent里面使用ActionBlock,但我觉得这会导致一堆问题。首先,它会阻塞任务,导致更多任务被启动,而这些任务会阻塞,迅速填满 TPL 的内部任务队列。此外,它会导致所有被阻止的任务同时完成,可能会出现乱序,从而导致另一个重新同步事件发生。

如果 TPL 无法做到这一点,有没有更好的方法来解决这个问题?

标签: c#task-parallel-librarydataflow

解决方案


推荐阅读