spring-integration - Spring Integration - http outboundAdapter 中的控制重试逻辑
问题描述
我在 Spring Integration 5.4.4 中配置了一个从 AMQP 队列读取并写入 http 出站适配器的路由。例如,当我以编程方式为 http 出站适配器声明错误的 http 主机名(原因 java.net.UnknownHostException)时,我无法控制重试。
即使我在 amqpInboundAdapter 中配置了 RetryTemplate 逻辑,这似乎会生成无限重试(RabbitMQ 容器上未确认消息)。
我的目标应该是:在http出站适配器出错的情况下将消息重新排队N次,否则丢弃消息并且不要再次重新排队。
代码在这里:
Spring集成路线
public IntegrationFlow route(AmqpInboundChannelAdapterSMLCSpec amqpInboundChannelAdapterSMLCSpec) {
return IntegrationFlows
.from(amqpInboundChannelAdapterSMLCSpec)
.filter(validJsonFilter())
.enrichHeaders(h -> h.header("X-Insert-Key",utboundHttpConfig.outboundHttpToken))
.enrichHeaders(h -> h.header("Content-Encoding", "gzip"))
.enrichHeaders(h -> h.header("Content-Type", "application/json"))
.handle(Http.outboundChannelAdapter(outboundHttpConfig.outboundHttpUrl) .mappedRequestHeaders("X-Insert-Key")
.httpMethod(HttpMethod.POST)
)
.get();
}
AmqpInboundChannelAdapterSMLCSpec
public AmqpInboundChannelAdapterSMLCSpec gatewayEventInboundAmqpAdapter(ConnectionFactory connectionFactory) {
RetryTemplate retryTemplate = new RetryTemplate();
exceptionClassifierRetryPolicy.setPolicyMap(exceptionPolicy);
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
retryTemplate.setThrowLastExceptionOnExhausted(true);
return Amqp
.inboundAdapter(connectionFactory, rabbitConfig.inboundQueue())
.configureContainer(c -> c
.concurrentConsumers(3)
.maxConcurrentConsumers(5)
.receiveTimeout(2000)
.alwaysRequeueWithTxManagerRollback(false)
)
.retryTemplate(retryTemplate);
}
有任何想法吗?
非常感谢
解决方案
如果http出站适配器出错,则将消息重新排队N次,否则丢弃该消息并且不再重新排队。
当您在 AMQP MessageListenerContainer 上使用重试时,会出现重新排队:重试在内存中完成,无需往返代理。
无论如何,您到目前为止所做的一切都还可以。只有您缺少的是RejectAndDontRequeueRecoverer
配置它Amqp.inboundAdapter()
来决定在所有重试尝试都用尽时如何处理 AMQP 消息。
不幸的是MessageRecoverer
,通道适配器的直接配置已添加自版本5.5
:https ://docs.spring.io/spring-integration/docs/5.5.0-M3/reference/html/whats-new.html#x5.5-amqp .
对于当前版本,它必须通过recoveryCallback(RecoveryCallback<?> recoveryCallback)
选项和相应的委托来完成:
.recoveryCallback(context -> {
org.springframework.amqp.core.Message messageToReject =
(org.springframework.amqp.core.Message) RetrySynchronizationManager.getContext()
.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
throw new ListenerExecutionFailedException("Retry Policy Exhausted",
new AmqpRejectAndDontRequeueException(context.getLastThrowable()), messageToReject);
}))
推荐阅读
- python - RGB 到 YIQ 并返回 python
- javascript - 如何在函数中创建模态页面?
- snappy - Laravel Snappy:多余的页面
- gdb - GDB“查找”命令提前终止
- c - 处理 C 中的双重加法/减法错误
- logstash-grok - 创建自定义 grok 模式
- python - Django获取最大分组数据的值
- tensorflow - Tensorflow 2 timeseries_dataset_from_array 输入与目标批次形状差异
- node.js - Nodejs,无服务器预期存根函数被调用一次但被调用0次
- angular - 如何使用Angular9中较大的项目数组中的默认值执行枚举垫选择选项?