c# - 如何为在断路的所有重试时调用的断路器进行回退
问题描述
我有以下政策:
var sharedBulkhead = Policy.BulkheadAsync(
maxParallelization: maxParallelizations,
maxQueuingActions: maxQueuingActions,
onBulkheadRejectedAsync: (context) =>
{
Log.Info($"Bulk head rejected => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
return TaskHelper.EmptyTask;
}
);
var retryPolicy = Policy.Handle<HttpRequestException>()
.Or<BrokenCircuitException>().WaitAndRetryAsync(
retryCount: maxRetryCount,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetryAsync: (exception, calculatedWaitDuration, retryCount, context) =>
{
Log.Error($"Retry => Count: {retryCount}, Wait duration: {calculatedWaitDuration}, Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}.");
return TaskHelper.EmptyTask;
});
var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException)).CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: maxExceptionsBeforeBreaking,
durationOfBreak: TimeSpan.FromSeconds(circuitBreakDurationSeconds),
onBreak: (exception, timespan, context) =>
{
Log.Error($"Circuit broken => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}");
},
onReset: (context) =>
{
Log.Info($"Circuit reset => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
}
);
var fallbackForCircuitBreaker = Policy<bool>
.Handle<BrokenCircuitException>()
.FallbackAsync(
fallbackValue: false,
onFallbackAsync: (b, context) =>
{
Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
return TaskHelper.EmptyTask;
}
);
var fallbackForAnyException = Policy<bool>
.Handle<Exception>()
.FallbackAsync(
fallbackAction: (ct, context) => { return Task.FromResult(false); },
onFallbackAsync: (e, context) =>
{
Log.Error($"An unexpected error occured => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
return TaskHelper.EmptyTask;
}
);
var resilienceStrategy = Policy.WrapAsync(retryPolicy, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.WrapAsync(fallbackForCircuitBreaker.WrapAsync(resilienceStrategy));
现在,fallbackForCircuitBreaker
仅在所有重试失败并且最后一次重试失败时调用BrokenCircuitException
. fallbackForCircuitBreaker
每次对断路进行重试时,应该进行哪些更改才能调用?
另外,我使用的sharedBulkHead
是服务中的实例字段,并在构造函数中初始化。这是一个好习惯吗?理想情况下要做什么onBulkheadRejectedAsync
?我可以修改重试策略来处理批量拒绝吗?
解决方案
现在,fallbackForCircuitBreaker 仅在所有重试失败并且最后一次重试失败并出现 BrokenCircuitException 时才被调用。每次对断路进行重试时,应该进行哪些更改才能调用 fallbackForCircuitBreaker?
请参阅PolicyWrap 文档,尤其是操作图和说明。PolicyWrap 中的策略就像调用周围的一系列嵌套中间件:
- 最外层(按阅读顺序最左边)策略执行下一个内层,后者执行下一个内层,以此类推;直到最里面的策略执行用户委托;
- 抛出的异常通过层向外冒泡(直到处理)
fallbackForCircuitBreaker
因此,要每次尝试都调用(等效于) ,请将其移动到重试策略中。
然而,当前fallbackForCircuitBreaker
将抛出的异常替换为返回值false
,而听起来您从每次尝试的回退中寻求的是“日志,然后进行下一次尝试”。技术是使用 fallback 作为log 然后 rethrow,这样你的重试策略仍然可以响应(rethrow)异常。所以:
var sharedBulkhead = /* as before */;
var retryPolicy = /* as before */;
var fallbackForCircuitBreaker = /* as before */;
var logCircuitBreakerBrokenPerTry = Policy<bool>
.Handle<BrokenCircuitException>()
.FallbackAsync(
fallbackValue: false,
onFallbackAsync: (outcome, context) =>
{
Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
throw outcome.Exception;
}
);
var fallbackForAnyException = /* as before */;
var resilienceStrategy = Policy.WrapAsync(retryPolicy, logCircuitBreakerBrokenPerTry, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.WrapAsync(fallbackForCircuitBreaker.WrapAsync(resilienceStrategy));
我可以修改重试策略来处理批量拒绝吗?
Polly 舱壁文档说明了策略 throws ,因此BulkheadRejectedException
:
var retryPolicy = Policy
.Handle<HttpRequestException>()
.Or<BrokenCircuitException>()
.Or<BulkheadRejectedException>()
/* etc */
理想情况下在 onBulkheadRejectedAsync 上要做什么?
你可以登录。从广义上讲,您可以摆脱多余的负载,或者使用隔板拒绝作为触发器来水平扩展以增加容量。Polly 文档在此处和此处提供了更多讨论。
另外,我正在使用 sharedBulkHead,它是服务中的一个实例字段,并在构造函数中初始化。这是一个好习惯吗?
这取决于。为了使 Bulkhead 策略实例的状态能够控制并发执行的调用数量, Bulkhead 策略实例的生命周期必须在受管控的调用中(而不是每次调用)具有较长的生命周期。
- 如果服务作为长寿命单例存在,则在实例字段中保存隔板策略将是合适的,因为隔板策略实例也将是长寿命的。
- 如果服务类的实例是由 DI 容器作为瞬态/按请求创建的,则您需要确保隔板策略实例仍然长期存在并在并发请求之间共享(例如通过 make it
static
),而不是按请求。 - 如果 HttpClientFactory 在 HttpClient 上配置了隔板实例,请遵循Polly 和 HttpClientFactory 文档中关于使用 HttpClientFactory 确定有状态策略范围的说明。
推荐阅读
- c++ - 我需要在我的 C++ 代码中使用向量或学习 STL
- javascript - 如何让 VsCode 从 es6 模块正确自动导入?
- php - 尝试合并两个表导致单个数组出现错误无法使用 [] 进行读取
- javascript - 音频[data-key="${event.keyCode}"])。解释这段代码
- r - 将表格类型对象导出到电子表格
- python - 使用 python 列表查找和替换
- unit-testing - 在调试检查相等性的合同违规时如何获取更多信息
- swift - SwiftUI 使用 ForEach 重新排序动画不适用于索引
- javascript - 为什么 .pop() 返回整个数组
- database - 如何测试 Spring 数据库?