首页 > 解决方案 > 使用 HttpClient 和 Polly 发送并行请求,但每个主机只能发送一个请求,以优雅地处理 429 响应

问题描述

介绍:

我正在构建一个单节点网络爬虫来简单地验证 URL 是否200 OK在 .NET Core 控制台应用程序中。我在不同的主机上有一组 URL,我用HttpClient. 我对使用 Polly 和 TPL 数据流还很陌生。

要求:

  1. 我想支持与可配置的MaxDegreeOfParallelism.
  2. 我想将对任何给定主机的并行请求数限制为 1(或可配置)。这是为了优雅地429 TooManyRequests使用 Polly 策略处理每个主机的响应。或者,我可以使用断路器在收到一个429响应时取消对同一主机的并发请求,然后一次一个地处理该特定主机?
  3. 我完全可以完全不使用 TPL 数据流,而可能使用 Polly Bulkhead 或其他一些机制来限制并行请求,但我不确定为了实现要求 2,该配置会是什么样子。

当前实施:

我当前的实现是有效的,除了我经常看到我会对x同一主机的并行请求429大约在同一时间返回......然后,他们都为重试策略暂停......然后,他们都猛击同一个主机再次同时经常仍然收到429s。即使我将同一主机的多个实例均匀地分布在整个队列中,我的 URL 集合也会被一些429最终仍开始生成 s 的特定主机超重。

收到a后429,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求200s。

验证器方法:

public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
{
    var validator = new TransformBlock<Uri, bool>(
        async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode,
        new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = MaxDegreeOfParallelism}
    );
    foreach (var url in urls)
        await validator.SendAsync(url, cancellationToken);
    validator.Complete();
    var validUrlCount = 0;
    while (await validator.OutputAvailableAsync(cancellationToken))
    {
        if(await validator.ReceiveAsync(cancellationToken))
            validUrlCount++;
    }
    await validator.Completion;
    return validUrlCount;
}

Polly 策略应用于GetValidCount()上面使用的 HttpClient 实例。

IAsyncPolicy<HttpResponseMessage> waitAndRetryTooManyRequests = Policy
    .HandleResult<HttpResponseMessage>(r => r.StatusCode == HttpStatusCode.TooManyRequests)
    .WaitAndRetryAsync(3,
        (retryCount, response, context) =>
            response.Result?.Headers.RetryAfter.Delta ?? TimeSpan.FromMilliseconds(120),
        async (response, timespan, retryCount, context) =>
        {
            // log stuff
        });

问题:

如何修改或替换此解决方案以增加对要求 2 的满意度?

标签: c#.net-coreweb-crawlertpl-dataflowpolly

解决方案


I'd try to introduce some sort of a flag LimitedMode to detect that this particular client is entered in limited mode. Below I declare two policies - one simple retry policy just to catch TooManyRequests and set the flag. The second policy is a out-of-the-box BulkHead policy.

    public void ConfigureServices(IServiceCollection services)
    {
        /* other configuration */

        var registry = services.AddPolicyRegistry();

        var catchPolicy = Policy.HandleResult<HttpResponseMessage>(r =>
            {
                LimitedMode = r.StatusCode == HttpStatusCode.TooManyRequests;
                return false;
            })
            .WaitAndRetryAsync(1, i => TimeSpan.FromSeconds(3)); 

        var bulkHead = Policy.BulkheadAsync<HttpResponseMessage>(1, 10, OnBulkheadRejectedAsync);

        registry.Add("catchPolicy", catchPolicy);
        registry.Add("bulkHead", bulkHead);

        services.AddHttpClient<CrapyWeatherApiClient>((client) =>
        {
            client.BaseAddress = new Uri("hosturl");
        }).AddPolicyHandlerFromRegistry(PolicySelector);
    }

Then you may want to dynamically decide on which policy to apply using the PolicySelector mechanism: in case the limited mode is active - wrap bulk head policy with catch 429 policy. If the success status code received - switch back to regular mode without a bulkhead.

    private IAsyncPolicy<HttpResponseMessage> PolicySelector(IReadOnlyPolicyRegistry<string> registry, HttpRequestMessage request)
    {
        var catchPolicy = registry.Get<IAsyncPolicy<HttpResponseMessage>>("catchPolicy");
        var bulkHead = registry.Get<IAsyncPolicy<HttpResponseMessage>>("bulkHead");
        if (LimitedMode)
        {
            return catchPolicy.WrapAsync(bulkHead);
        }

        return catchPolicy;
    }        

推荐阅读