首页 > 解决方案 > 如何为在断路的所有重试时调用的断路器进行回退

问题描述

我有以下政策:

var sharedBulkhead = Policy.BulkheadAsync(
            maxParallelization: maxParallelizations, 
            maxQueuingActions: maxQueuingActions,
            onBulkheadRejectedAsync: (context) =>
            {
                Log.Info($"Bulk head rejected => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                return TaskHelper.EmptyTask;
            }
        );

var retryPolicy = Policy.Handle<HttpRequestException>()
.Or<BrokenCircuitException>().WaitAndRetryAsync(
            retryCount: maxRetryCount,
            sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
            onRetryAsync: (exception, calculatedWaitDuration, retryCount, context) =>
            {
                Log.Error($"Retry => Count: {retryCount}, Wait duration: {calculatedWaitDuration}, Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}.");
                return TaskHelper.EmptyTask;
            });

            var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException)).CircuitBreakerAsync(
            exceptionsAllowedBeforeBreaking: maxExceptionsBeforeBreaking, 
            durationOfBreak: TimeSpan.FromSeconds(circuitBreakDurationSeconds), 
            onBreak: (exception, timespan, context) =>
            {
                Log.Error($"Circuit broken => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}");
            },
            onReset: (context) =>
            {
                Log.Info($"Circuit reset => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
            }
        );

var fallbackForCircuitBreaker = Policy<bool>
         .Handle<BrokenCircuitException>()
         .FallbackAsync(
             fallbackValue: false,
             onFallbackAsync: (b, context) =>
             {
                 Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                 return TaskHelper.EmptyTask;
             }
         );

var fallbackForAnyException = Policy<bool>
            .Handle<Exception>()
            .FallbackAsync(
                fallbackAction: (ct, context) => { return Task.FromResult(false); },
                onFallbackAsync: (e, context) =>
                {
                    Log.Error($"An unexpected error occured => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                    return TaskHelper.EmptyTask;
                }
            );


var resilienceStrategy = Policy.WrapAsync(retryPolicy, circuitBreaker, sharedBulkhead);
        var policyWrap = fallbackForAnyException.WrapAsync(fallbackForCircuitBreaker.WrapAsync(resilienceStrategy));

现在,fallbackForCircuitBreaker仅在所有重试失败并且最后一次重试失败时调用BrokenCircuitException. fallbackForCircuitBreaker每次对断路进行重试时,应该进行哪些更改才能调用?

另外,我使用的sharedBulkHead是服务中的实例字段,并在构造函数中初始化。这是一个好习惯吗?理想情况下要做什么onBulkheadRejectedAsync?我可以修改重试策略来处理批量拒绝吗?

标签: c#asp.netpolly

解决方案


现在,fallbackForCircuitBreaker 仅在所有重试失败并且最后一次重试失败并出现 BrokenCircuitException 时才被调用。每次对断路进行重试时,应该进行哪些更改才能调用 fallbackForCircuitBreaker?

请参阅PolicyWrap 文档,尤其是操作图和说明。PolicyWrap 中的策略就像调用周围的一系列嵌套中间件:

  • 最外层(按阅读顺序最左边)策略执行下一个内层,后者执行下一个内层,以此类推;直到最里面的策略执行用户委托;
  • 抛出的异常通过层向外冒泡(直到处理)

fallbackForCircuitBreaker因此,要每次尝试都调用(等效于) ,请将其移动到重试策略中。

然而,当前fallbackForCircuitBreaker将抛出的异常替换为返回值false,而听起来您从每次尝试的回退中寻求的是“日志,然后进行下一次尝试”。技术是使用 fallback 作为log 然后 rethrow,这样你的重试策略仍然可以响应(rethrow)异常。所以:

var sharedBulkhead = /* as before */;
var retryPolicy = /* as before */;
var fallbackForCircuitBreaker = /* as before */;
var logCircuitBreakerBrokenPerTry = Policy<bool>
     .Handle<BrokenCircuitException>()
     .FallbackAsync(
         fallbackValue: false,
         onFallbackAsync: (outcome, context) =>
         {
             Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
             throw outcome.Exception;
         }
     );
var fallbackForAnyException = /* as before */;

var resilienceStrategy = Policy.WrapAsync(retryPolicy, logCircuitBreakerBrokenPerTry, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.WrapAsync(fallbackForCircuitBreaker.WrapAsync(resilienceStrategy));

我可以修改重试策略来处理批量拒绝吗?

Polly 舱壁文档说明了策略 throws ,因此BulkheadRejectedException

var retryPolicy = Policy
    .Handle<HttpRequestException>()
    .Or<BrokenCircuitException>()
    .Or<BulkheadRejectedException>()
    /* etc */

理想情况下在 onBulkheadRejectedAsync 上要做什么?

你可以登录。从广义上讲,您可以摆脱多余的负载,或者使用隔板拒绝作为触发器来水平扩展以增加容量。Polly 文档在此处此处提供了更多讨论。

另外,我正在使用 sharedBulkHead,它是服务中的一个实例字段,并在构造函数中初始化。这是一个好习惯吗?

这取决于。为了使 Bulkhead 策略实例的状态能够控制并发执行的调用数量, Bulkhead 策略实例的生命周期必须在受管控的调用中(而不是每次调用)具有较长的生命周期。

  • 如果服务作为长寿命单例存在,则在实例字段中保存隔板策略将是合适的,因为隔板策略实例也将是长寿命的。
  • 如果服务类的实例是由 DI 容器作为瞬态/按请求创建的,则您需要确保隔板策略实例仍然长期存在并在并发请求之间共享(例如通过 make it static),而不是按请求。
  • 如果 HttpClientFactory 在 HttpClient 上配置了隔板实例,请遵循Polly 和 HttpClientFactory 文档中关于使用 HttpClientFactory 确定有状态策略范围的说明

推荐阅读