apache-camel - 如何使用 Camel 和 IBM MQ 配置消费者级别的事务重新传递
问题描述
我正在尝试使用连接到 IBM MQ 的 Apache Camel 在 Java Spring Boot 中完成事务 JMS 客户端。此外,当消息处理失败时,客户端需要应用指数退避重新传递行为。原因:来自 MQ 的消息需要处理并转发到可能停机维护数小时的外部系统。使用事务来保证至少一次处理保证对我来说似乎是合适的解决方案。
我已经研究了这个话题很多小时,但一直没有找到解决方案。我将从我目前拥有的东西开始:
@Bean
UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter ()
throws IOException {
MQConnectionFactory factory = new MQConnectionFactory();
factory.setCCDTURL(tabFilePath);
UserCredentialsConnectionFactoryAdapter adapter =
new UserCredentialsConnectionFactoryAdapter();
adapter.setTargetConnectionFactory(factory);
adapter.setUsername(userName);
bentechConnectionFactoryAdapter.setPassword(password);
return adapter;
}
@Bean
PlatformTransactionManager jmsTransactionManager(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter) {
JmsTransactionManager txMgr = new JmsTransactionManager(uccConnectionFactoryAdapter);
return txMgr;
}
@Bean()
CamelContextConfiguration contextConfiguration(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter,
@Qualifier("jmsTransactionManager") @Autowired PlatformTransactionManager txMgr) {
return new CamelContextConfiguration() {
@Override
public void beforeApplicationStart(CamelContext context) {
JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(uccConnectionFactoryAdapter, txMgr);
// required for consumer-level redelivery after rollback
jmsComponent.setCacheLevelName("CACHE_CONSUMER");
jmsComponent.setTransacted(true);
jmsComponent.getConfiguration().setConcurrentConsumers(1);
context.addComponent("jms", jmsComponent);
}
@Override
public void afterApplicationStart(CamelContext camelContext) {
// Do nothing
}
};
}
// in a route builder
...
from("jms:topic:INPUT_TOPIC?clientId=" + CLIENT_ID + "&subscriptionDurable=true&durableSubscriptionName="+ SUBSCRIPTION_NAME)
.transacted()
.("direct:processMessage");
...
我能够通过集成测试验证事务行为。如果在消息处理期间发生未处理的异常,事务将回滚并重试。问题是,它会立即重试,每秒几次,可能会导致 IBM MQ 管理器和外部系统的负载过重。
对于 ActiveMQ,重新投递策略很容易做到,网上有很多例子。ActiveMQConnectionFactory有一个setRedeliveryPolicy方法,这意味着 ActiveMQ 客户端库内置了重新传递逻辑。据我所知,这与 Camel 的Transactional Client EIP 的文档一致,其中指出:
事务模式下的重新交付不是由 Camel 处理的,而是由支持系统(事务管理器)处理的。在这种情况下,您应该求助于支持系统如何配置重新交付。
我绝对想不通的是如何为 IBM MQ 实现同样的目标。IBM 的MQConnectionFactory不支持重新传递策略。事实上,在redeliverypolicy
MQ 知识中心中搜索会准确地找到...击鼓... 0 次点击。我什至浏览了一下 MQConnectionFactory 的实现,也没有发现任何东西。
我研究的另一个支持系统是JmsTransactionManager。搜索“jmstransactionmanager 重新交付策略”或“jmstransactionmanager 指数退避”也没有发现任何有用的信息。有一些讨论TransactionTemplate
,AbstractMessageListenerContainer
但 1)我没有看到与重新交付政策有任何联系,以及 2)我无法弄清楚这些政策如何与 Camel 和 JMS 交互。
Sooo,有人知道如何使用 Apache Camel 和 IBM MQ 实现指数退避重新交付策略吗?
结束语:Camel 支持重新交付策略,errorHandler
并且与事务/连接支持系统中的重新交付策略不同onException
。这些处理程序使用处于任何状态的“Exchange”对象在故障点重试,而不会从路由开始回滚和重新处理消息。事务在整个重试期间保持活动状态,只有当or放弃时才会发生回滚。对于可能持续数小时的重试,这不是我想要的。errorHandler
onException
解决方案
看起来@JoshMc 为我指出了正确的方向。我设法实施了一个RoutePolicy
随着延迟增加而延迟重新交付的方法。我已经运行了几个小时的测试会话和数千次相同消息的重新传递,以查看是否存在内存泄漏、MQ 连接耗尽等问题。我没有观察到任何问题。到 MQ 管理器有两个稳定的 TCP 连接,Java 进程的内存使用在一个很近的范围内移动。
import java.util.Timer;
import java.util.TimerTask;
import javax.jms.Session;
import lombok.extern.log4j.Log4j2;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Route;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.support.RoutePolicySupport;
@Log4j2
public class ExponentialBackoffPolicy extends RoutePolicySupport implements CamelContextAware {
final static String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
private CamelContext camelContext;
@Override
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
@Override
public CamelContext getCamelContext() {
return this.camelContext;
}
@Override
public void onExchangeDone(Route route, Exchange exchange) {
try {
// ideally we would check if the exchange is transacted but onExchangeDone is called after the
// transaction is already rolled back, and the transaction context has already been removed.
if (exchange.getException() == null)
{
log.debug("No exception occurred, skipping route suspension.");
return;
}
int deliveryCount = getRetryCount(exchange);
int redeliveryDelay = getRedeliveryDelay(deliveryCount);
log.info("Suspending route {} for {}ms after exception. Current delivery count {}.",
route.getId(), redeliveryDelay, deliveryCount);
super.suspendRoute(route);
scheduleWakeup(route, redeliveryDelay);
} catch (Exception ex) {
// only log exception and let Camel continue as of this policy didn't exist.
log.error("Exception while suspending route", ex);
}
}
void scheduleWakeup(Route route, int redeliveryDelay) {
Timer timer = new Timer();
timer.schedule(
new TimerTask() {
@Override
public void run() {
log.info("Resuming route {} after redelivery delay of {}ms.", route.getId(), redeliveryDelay);
try {
resumeRoute(route);
} catch (Exception ex) {
// only log exception and let Camel continue as of this policy didn't exist.
log.error("Exception while resuming route", ex);
}
timer.cancel();
}
},
redeliveryDelay);
}
int getRetryCount(Exchange exchange) {
Message msg = exchange.getIn();
return (int) msg.getHeader(JMSX_DELIVERY_COUNT, 1);
}
int getRedeliveryDelay(int deliveryCount) {
// very crude backoff strategy for now, will need to refine later
if (deliveryCount < 10) return 1000;
if (deliveryCount < 20) return 5000;
if (deliveryCount < 30) return 20000;
return 60000;
}
}
这就是它在路由定义中的使用方式:
from(mqConnectionString)
.routePolicy(new ExponentialBackoffPolicy())
.transacted()
...
// and if you want to distinguish between retriable and non-retriable situations, apply the following two exception handlers
onException(NonRetriableProcessingException.class)
.handled(true)
.log(LoggingLevel.WARN, "Non-retriable exception occurred, discard message.");
onException(Exception.class)
.handled(false)
.log(LoggingLevel.WARN, "Retriable exception occurred, retry message.");
需要注意的一点是,JMSXDeliveryCount
标头来自 MQ 管理器,并由此计算重新传递延迟。当您在消息永久失败时使用策略重新启动应用程序ExponentialBackoff
时,重新启动时它将立即尝试重新处理该消息,但如果发生另一个失败,则应用与重新传递总数相对应的延迟,并且不会以最初的短暂延迟重新开始.
推荐阅读
- java - testException = [null] 与 Hibernate 和 SpringMVC
- excel - Excel VBA在一定范围内调整图片大小
- php - 验证后 Laravel 旧 POST 数据为空
- javascript - 切换域时不加载 Javascript/Bootstrap/JQuery
- python - 您如何模拟未被 moto 模拟的 aws 服务?
- regex - 使用 Perl 插入美元符号
- python - 允许用户在基于类的视图中将帐户详细信息编辑到 Django 中的 Postgres
- python - “模块‘folium.features’没有属性‘CircleMarker’”
- python - ValueError: x 和 y 必须具有相同的第一维,但具有形状 (10, 1) 和 (90,)
- amazon-web-services - AWS Code Pipeline 中的验证异常