首页 > 解决方案 > 为并行 API 调用添加延迟

问题描述

我正在使用 Polly 进行并行 API 调用。但是服务器每秒不能处理超过 25 个调用,所以我想知道是否有办法在每批 25 个调用之后添加 1s 延迟?

var policy = Policy
    .Handle<HttpRequestException>()
    .RetryAsync(3);

foreach (var mediaItem in uploadedMedia)
{
    var mediaRequest = new HttpRequestMessage { *** }
    async Task<string> func()
    {
        var response = await client.SendAsync(mediaRequest);
        return await response.Content.ReadAsStringAsync();
    }
    tasks.Add(policy.ExecuteAsync(() => func()));
}
await Task.WhenAll(tasks);

我根据下面的建议添加了一个计数,但似乎不起作用

foreach (var mediaItem in uploadedMedia.Items)
{
    var mediaRequest = new HttpRequestMessage
    {
        RequestUri = new Uri($"https://u48ydao1w4.execute-api.ap-southeast-2.amazonaws.com/test/downloads/thumbnails/{mediaItem.filename.S}"),
        Method = HttpMethod.Get,
        Headers = {
            { "id-token", id_Token },
            { "access-token", access_Token }
        }
    };

    async Task<string> func()
    {
        if (count == 24)
        {
            Thread.Sleep(1000);
            count = 0;
        }
        var response = await client.SendAsync(mediaRequest);
        count++;
        return await response.Content.ReadAsStringAsync();
    }
    tasks.Add(policy.ExecuteAsync(() => func()));
}

await Task.WhenAll(tasks);

foreach (var t in tasks)
{
    var postResponse = await t;
    urls.Add(postResponse);
}

标签: c#concurrencyasync-awaitpolly

解决方案


有很多方法可以做到这一点,但是编写一个简单的线程安全可重用异步速率限制器相当容易。

异步方法的优点是,它不会阻塞线程池线程,它相当高效,并且可以在现有的异步工作流和管道(如 TPL Dataflow 和 Reactive Extensions)中很好地工作。

例子

// 3 calls every 3 seconds as an example
var rateLimiter = new RateLimiter(3, TimeSpan.FromSeconds(3));

// create some work
var task1 = Task.Run(async () =>
   {
      for (var i = 0; i < 5; i++)
      {
         await rateLimiter.WaitAsync();
         Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
      }
   }

);
var task2 = Task.Run(async () =>
   {
      for (var i = 0; i < 5; i++)
      {
         await rateLimiter.WaitAsync();
         Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
      }
   }

);
await Task.WhenAll(task1, task2);

输出

4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:15
4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
4 : 10/25/2020 05:16:24

完整的演示在这里

用法

private RateLimiter _rateLimiter = new RateLimiter(25 , TimeSpan.FromSeconds(1));

...

async Task<string> func()
{
    await _rateLimiter.WaitAsync();

    var response = await client.SendAsync(mediaRequest);
    return await response.Content.ReadAsStringAsync();
}

班级

public class RateLimiter
{
   private readonly CancellationToken _cancellationToken;
   private readonly TimeSpan _timeSpan;
   private bool _isProcessing;
   private readonly int _count;
   private readonly Queue<DateTime> _completed = new Queue<DateTime>();
   private readonly Queue<TaskCompletionSource<bool>> _waiting = new Queue<TaskCompletionSource<bool>>();
   private readonly object _sync = new object();

   public RateLimiter(int count, TimeSpan timeSpan, CancellationToken cancellationToken = default)
   {
      _count = count;
      _timeSpan = timeSpan;
      _cancellationToken = cancellationToken;
   }

   private void Cleanup()
   {
      // if the cancellation  was request, we need to throw on all waiting items
      while (_cancellationToken.IsCancellationRequested && _waiting.Any())
         if (_waiting.TryDequeue(out var item))
            item.TrySetCanceled();

      _waiting.Clear();
      _completed.Clear();

      _isProcessing = false;
   }

   private async Task ProcessAsync()
   {
      try
      {
         while (true)
         {

            _cancellationToken.ThrowIfCancellationRequested();
            var time = DateTime.Now - _timeSpan;

            lock (_sync)
            {
               // remove anything out of date from the queue
               while (_completed.Any() && _completed.Peek() < time)
                  _completed.TryDequeue(out _);

               // signal waiting tasks to process
               while (_completed.Count < _count && _waiting.Any())
               {
                  if (_waiting.TryDequeue(out var item))
                     item.TrySetResult(true);
                  _completed.Enqueue(DateTime.Now);
               }

               if (!_waiting.Any() && !_completed.Any())
               {
                  Cleanup();
                  break;
               }
            }

            var delay = (_completed.Peek() - time) + TimeSpan.FromMilliseconds(20);

            if (delay.Ticks > 0)
               await Task.Delay(delay, _cancellationToken);
            Console.WriteLine(delay);
         }
      }
      catch (OperationCanceledException)
      {
         lock (_sync)
            Cleanup();
      }
   }

   public ValueTask WaitAsync()
   {
      // ReSharper disable once InconsistentlySynchronizedField
      _cancellationToken.ThrowIfCancellationRequested();

      lock (_sync)
      {
         try
         {
            if (_completed.Count < _count && !_waiting.Any())
            {
               _completed.Enqueue(DateTime.Now);
               return new ValueTask();
            }

            var tcs = new TaskCompletionSource<bool>();
            _waiting.Enqueue(tcs);
            return new ValueTask(tcs.Task);
         }
         finally
         {
            if (!_isProcessing)
            {
               _isProcessing = true;
               _ = ProcessAsync();
            }
         }
      }
   }
}

注意 1:最好以最大程度的并行度使用它

注2:虽然我对此进行了测试,但我只是为这个答案写了它作为一个新颖的解决方案


推荐阅读