c# - 使用 HttpClient 和 Polly 发送并行请求,但每个主机只能发送一个请求,以优雅地处理 429 响应
问题描述
介绍:
我正在构建一个单节点网络爬虫来简单地验证 URL 是否200 OK
在 .NET Core 控制台应用程序中。我在不同的主机上有一组 URL,我用HttpClient
. 我对使用 Polly 和 TPL 数据流还很陌生。
要求:
- 我想支持与可配置的
MaxDegreeOfParallelism
. - 我想将对任何给定主机的并行请求数限制为 1(或可配置)。这是为了优雅地
429 TooManyRequests
使用 Polly 策略处理每个主机的响应。或者,我可以使用断路器在收到一个429
响应时取消对同一主机的并发请求,然后一次一个地处理该特定主机? - 我完全可以完全不使用 TPL 数据流,而可能使用 Polly Bulkhead 或其他一些机制来限制并行请求,但我不确定为了实现要求 2,该配置会是什么样子。
当前实施:
我当前的实现是有效的,除了我经常看到我会对x
同一主机的并行请求429
大约在同一时间返回......然后,他们都为重试策略暂停......然后,他们都猛击同一个主机再次同时经常仍然收到429
s。即使我将同一主机的多个实例均匀地分布在整个队列中,我的 URL 集合也会被一些429
最终仍开始生成 s 的特定主机超重。
收到a后429
,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求200
s。
验证器方法:
public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
{
var validator = new TransformBlock<Uri, bool>(
async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode,
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = MaxDegreeOfParallelism}
);
foreach (var url in urls)
await validator.SendAsync(url, cancellationToken);
validator.Complete();
var validUrlCount = 0;
while (await validator.OutputAvailableAsync(cancellationToken))
{
if(await validator.ReceiveAsync(cancellationToken))
validUrlCount++;
}
await validator.Completion;
return validUrlCount;
}
Polly 策略应用于GetValidCount()
上面使用的 HttpClient 实例。
IAsyncPolicy<HttpResponseMessage> waitAndRetryTooManyRequests = Policy
.HandleResult<HttpResponseMessage>(r => r.StatusCode == HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(3,
(retryCount, response, context) =>
response.Result?.Headers.RetryAfter.Delta ?? TimeSpan.FromMilliseconds(120),
async (response, timespan, retryCount, context) =>
{
// log stuff
});
问题:
如何修改或替换此解决方案以增加对要求 2 的满意度?
解决方案
I'd try to introduce some sort of a flag LimitedMode
to detect that this particular client is entered in limited mode. Below I declare two policies - one simple retry policy just to catch TooManyRequests and set the flag. The second policy is a out-of-the-box BulkHead
policy.
public void ConfigureServices(IServiceCollection services)
{
/* other configuration */
var registry = services.AddPolicyRegistry();
var catchPolicy = Policy.HandleResult<HttpResponseMessage>(r =>
{
LimitedMode = r.StatusCode == HttpStatusCode.TooManyRequests;
return false;
})
.WaitAndRetryAsync(1, i => TimeSpan.FromSeconds(3));
var bulkHead = Policy.BulkheadAsync<HttpResponseMessage>(1, 10, OnBulkheadRejectedAsync);
registry.Add("catchPolicy", catchPolicy);
registry.Add("bulkHead", bulkHead);
services.AddHttpClient<CrapyWeatherApiClient>((client) =>
{
client.BaseAddress = new Uri("hosturl");
}).AddPolicyHandlerFromRegistry(PolicySelector);
}
Then you may want to dynamically decide on which policy to apply using the PolicySelector
mechanism: in case the limited mode is active - wrap bulk head policy with catch 429 policy. If the success status code received - switch back to regular mode without a bulkhead.
private IAsyncPolicy<HttpResponseMessage> PolicySelector(IReadOnlyPolicyRegistry<string> registry, HttpRequestMessage request)
{
var catchPolicy = registry.Get<IAsyncPolicy<HttpResponseMessage>>("catchPolicy");
var bulkHead = registry.Get<IAsyncPolicy<HttpResponseMessage>>("bulkHead");
if (LimitedMode)
{
return catchPolicy.WrapAsync(bulkHead);
}
return catchPolicy;
}
推荐阅读
- python - typeerror: __call__() 接受 2 个位置参数,但给出了 3 个(Gunicorn)
- java - Java程序没有产生正确的结果
- java - `输入活动编号:线程“主”java.util.NoSuchElementException中的异常
- .htaccess - 使用 .htaccess 从 URL 中删除“/php/”子目录
- html - CSS 内容溢出 Div
- audio - 从音频文件中删除人声的算法
- openstack - !!开发堆栈!!setup.py 在运行 pbr 时遇到错误
- ios - PencilKit 清除画布并不总是有效
- python - ModuleNotFoundError:没有名为“sklearn.ensemble.forest”的模块
- java - 没有找到“folioreader”的变体