c# - 为并行 API 调用添加延迟
问题描述
我正在使用 Polly 进行并行 API 调用。但是服务器每秒不能处理超过 25 个调用,所以我想知道是否有办法在每批 25 个调用之后添加 1s 延迟?
var policy = Policy
.Handle<HttpRequestException>()
.RetryAsync(3);
foreach (var mediaItem in uploadedMedia)
{
var mediaRequest = new HttpRequestMessage { *** }
async Task<string> func()
{
var response = await client.SendAsync(mediaRequest);
return await response.Content.ReadAsStringAsync();
}
tasks.Add(policy.ExecuteAsync(() => func()));
}
await Task.WhenAll(tasks);
我根据下面的建议添加了一个计数,但似乎不起作用
foreach (var mediaItem in uploadedMedia.Items)
{
var mediaRequest = new HttpRequestMessage
{
RequestUri = new Uri($"https://u48ydao1w4.execute-api.ap-southeast-2.amazonaws.com/test/downloads/thumbnails/{mediaItem.filename.S}"),
Method = HttpMethod.Get,
Headers = {
{ "id-token", id_Token },
{ "access-token", access_Token }
}
};
async Task<string> func()
{
if (count == 24)
{
Thread.Sleep(1000);
count = 0;
}
var response = await client.SendAsync(mediaRequest);
count++;
return await response.Content.ReadAsStringAsync();
}
tasks.Add(policy.ExecuteAsync(() => func()));
}
await Task.WhenAll(tasks);
foreach (var t in tasks)
{
var postResponse = await t;
urls.Add(postResponse);
}
解决方案
有很多方法可以做到这一点,但是编写一个简单的线程安全可重用异步速率限制器相当容易。
异步方法的优点是,它不会阻塞线程池线程,它相当高效,并且可以在现有的异步工作流和管道(如 TPL Dataflow 和 Reactive Extensions)中很好地工作。
例子
// 3 calls every 3 seconds as an example
var rateLimiter = new RateLimiter(3, TimeSpan.FromSeconds(3));
// create some work
var task1 = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await rateLimiter.WaitAsync();
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
}
}
);
var task2 = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await rateLimiter.WaitAsync();
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
}
}
);
await Task.WhenAll(task1, task2);
输出
4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:15
4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
4 : 10/25/2020 05:16:24
用法
private RateLimiter _rateLimiter = new RateLimiter(25 , TimeSpan.FromSeconds(1));
...
async Task<string> func()
{
await _rateLimiter.WaitAsync();
var response = await client.SendAsync(mediaRequest);
return await response.Content.ReadAsStringAsync();
}
班级
public class RateLimiter
{
private readonly CancellationToken _cancellationToken;
private readonly TimeSpan _timeSpan;
private bool _isProcessing;
private readonly int _count;
private readonly Queue<DateTime> _completed = new Queue<DateTime>();
private readonly Queue<TaskCompletionSource<bool>> _waiting = new Queue<TaskCompletionSource<bool>>();
private readonly object _sync = new object();
public RateLimiter(int count, TimeSpan timeSpan, CancellationToken cancellationToken = default)
{
_count = count;
_timeSpan = timeSpan;
_cancellationToken = cancellationToken;
}
private void Cleanup()
{
// if the cancellation was request, we need to throw on all waiting items
while (_cancellationToken.IsCancellationRequested && _waiting.Any())
if (_waiting.TryDequeue(out var item))
item.TrySetCanceled();
_waiting.Clear();
_completed.Clear();
_isProcessing = false;
}
private async Task ProcessAsync()
{
try
{
while (true)
{
_cancellationToken.ThrowIfCancellationRequested();
var time = DateTime.Now - _timeSpan;
lock (_sync)
{
// remove anything out of date from the queue
while (_completed.Any() && _completed.Peek() < time)
_completed.TryDequeue(out _);
// signal waiting tasks to process
while (_completed.Count < _count && _waiting.Any())
{
if (_waiting.TryDequeue(out var item))
item.TrySetResult(true);
_completed.Enqueue(DateTime.Now);
}
if (!_waiting.Any() && !_completed.Any())
{
Cleanup();
break;
}
}
var delay = (_completed.Peek() - time) + TimeSpan.FromMilliseconds(20);
if (delay.Ticks > 0)
await Task.Delay(delay, _cancellationToken);
Console.WriteLine(delay);
}
}
catch (OperationCanceledException)
{
lock (_sync)
Cleanup();
}
}
public ValueTask WaitAsync()
{
// ReSharper disable once InconsistentlySynchronizedField
_cancellationToken.ThrowIfCancellationRequested();
lock (_sync)
{
try
{
if (_completed.Count < _count && !_waiting.Any())
{
_completed.Enqueue(DateTime.Now);
return new ValueTask();
}
var tcs = new TaskCompletionSource<bool>();
_waiting.Enqueue(tcs);
return new ValueTask(tcs.Task);
}
finally
{
if (!_isProcessing)
{
_isProcessing = true;
_ = ProcessAsync();
}
}
}
}
}
注意 1:最好以最大程度的并行度使用它。
注2:虽然我对此进行了测试,但我只是为这个答案写了它作为一个新颖的解决方案。
推荐阅读
- javascript - 防止在模板文字中转义反斜杠
- c# - Ajax.beginform 将空值发布到控制器
- python - Dataframe python将一些值从水平重塑为垂直
- python - 在 Eclipse 中使用 Pydev 自动创建类
- python - 下载同名的不同文件 一次又一次地获取上一个文件
- java - Apache 轴与 java 11 的兼容性
- python - 哪个更有效 dict.get("1", "default val") 或者 if else 检查
- c# - Selenium C# 在网页中填充 Windows 安全性
- tensorflow - Keras 指标问题
- javascript - 无法从另一台电脑查看局域网上的 React 应用程序