首页 > 解决方案 > 如何使用 Camel 和 IBM MQ 配置消费者级别的事务重新传递

问题描述

我正在尝试使用连接到 IBM MQ 的 Apache Camel 在 Java Spring Boot 中完成事务 JMS 客户端。此外,当消息处理失败时,客户端需要应用指数退避重新传递行为。原因:来自 MQ 的消息需要处理并转发到可能停机维护数小时的外部系统。使用事务来保证至少一次处理保证对我来说似乎是合适的解决方案。

我已经研究了这个话题很多小时,但一直没有找到解决方案。我将从我目前拥有的东西开始:

  @Bean
  UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter ()
      throws IOException {
    MQConnectionFactory factory = new MQConnectionFactory();
    factory.setCCDTURL(tabFilePath);

    UserCredentialsConnectionFactoryAdapter adapter =
        new UserCredentialsConnectionFactoryAdapter();
    adapter.setTargetConnectionFactory(factory);
    adapter.setUsername(userName);
    bentechConnectionFactoryAdapter.setPassword(password);

    return adapter;
  }

  @Bean
  PlatformTransactionManager jmsTransactionManager(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter) {
    JmsTransactionManager txMgr = new JmsTransactionManager(uccConnectionFactoryAdapter);
    return txMgr;
  }

  @Bean()
  CamelContextConfiguration contextConfiguration(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter,
      @Qualifier("jmsTransactionManager") @Autowired PlatformTransactionManager txMgr) {
    return new CamelContextConfiguration() {
      @Override
      public void beforeApplicationStart(CamelContext context) {
        JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(uccConnectionFactoryAdapter, txMgr);
        // required for consumer-level redelivery after rollback
        jmsComponent.setCacheLevelName("CACHE_CONSUMER");
        jmsComponent.setTransacted(true);
        jmsComponent.getConfiguration().setConcurrentConsumers(1);

        context.addComponent("jms", jmsComponent);
      }

      @Override
      public void afterApplicationStart(CamelContext camelContext) {
        // Do nothing
      }
    };
  }

// in a route builder
...
from("jms:topic:INPUT_TOPIC?clientId=" + CLIENT_ID + "&subscriptionDurable=true&durableSubscriptionName="+ SUBSCRIPTION_NAME)
    .transacted()
    .("direct:processMessage");
...

我能够通过集成测试验证事务行为。如果在消息处理期间发生未处理的异常,事务将回滚并重试。问题是,它会立即重试,每秒几次,可能会导致 IBM MQ 管理器和外部系统的负载过重。

对于 ActiveMQ,重新投递策略很容易做到,网上有很多例子。ActiveMQConnectionFactory有一个setRedeliveryPolicy方法,这意味着 ActiveMQ 客户端库内置了重新传递逻辑。据我所知,这与 Camel 的Transactional Client EIP 的文档一致,其中指出:

事务模式下的重新交付不是由 Camel 处理的,而是由支持系统(事务管理器)处理的。在这种情况下,您应该求助于支持系统如何配置重新交付。

我绝对想不通的是如何为 IBM MQ 实现同样的目标。IBM 的MQConnectionFactory不支持重新传递策略。事实上,在redeliverypolicyMQ 知识中心中搜索会准确地找到...击鼓... 0 次点击。我什至浏览了一下 MQConnectionFactory 的实现,也没有发现任何东西。

我研究的另一个支持系统是JmsTransactionManager。搜索“jmstransactionmanager 重新交付策略”或“jmstransactionmanager 指数退避”也没有发现任何有用的信息。有一些讨论TransactionTemplateAbstractMessageListenerContainer但 1)我没有看到与重新交付政策有任何联系,以及 2)我无法弄清楚这些政策如何与 Camel 和 JMS 交互。

Sooo,有人知道如何使用 Apache Camel 和 IBM MQ 实现指数退避重新交付策略吗?

结束语:Camel 支持重新交付策略,errorHandler并且事务/连接支持系统中的重新交付策略不同onException。这些处理程序使用处于任何状态的“Exchange”对象在故障点重试,而不会从路由开始回滚和重新处理消息。事务在整个重试期间保持活动状态,只有当or放弃时才会发生回滚。对于可能持续数小时的重试,这不是我想要的。errorHandleronException

标签: apache-cameljmsibm-mqspring-jmsmq

解决方案


看起来@JoshMc 为我指出了正确的方向。我设法实施了一个RoutePolicy随着延迟增加而延迟重新交付的方法。我已经运行了几个小时的测试会话和数千次相同消息的重新传递,以查看是否存在内存泄漏、MQ 连接耗尽等问题。我没有观察到任何问题。到 MQ 管理器有两个稳定的 TCP 连接,Java 进程的内存使用在一个很近的范围内移动。

import java.util.Timer;
import java.util.TimerTask;
import javax.jms.Session;
import lombok.extern.log4j.Log4j2;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Route;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.support.RoutePolicySupport;

@Log4j2
public class ExponentialBackoffPolicy extends RoutePolicySupport implements CamelContextAware {
  final static String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
  private CamelContext camelContext;

  @Override
  public void setCamelContext(CamelContext camelContext) {
    this.camelContext = camelContext;
  }

  @Override
  public CamelContext getCamelContext() {
    return this.camelContext;
  }

  @Override
  public void onExchangeDone(Route route, Exchange exchange) {
    try {
      // ideally we would check if the exchange is transacted but onExchangeDone is called after the
      // transaction is already rolled back, and the transaction context has already been removed.
      if (exchange.getException() == null)
      {
        log.debug("No exception occurred, skipping route suspension.");
        return;
      }

      int deliveryCount = getRetryCount(exchange);
      int redeliveryDelay = getRedeliveryDelay(deliveryCount);
      log.info("Suspending route {} for {}ms after exception. Current delivery count {}.",
          route.getId(), redeliveryDelay, deliveryCount);

      super.suspendRoute(route);
      scheduleWakeup(route, redeliveryDelay);
    } catch (Exception ex) {
      // only log exception and let Camel continue as of this policy didn't exist.
      log.error("Exception while suspending route", ex);
    }
  }

  void scheduleWakeup(Route route, int redeliveryDelay) {
    Timer timer = new Timer();
    timer.schedule(
        new TimerTask() {
          @Override
          public void run() {
            log.info("Resuming route {} after redelivery delay of {}ms.", route.getId(), redeliveryDelay);
            try {
              resumeRoute(route);
            } catch (Exception ex) {
              // only log exception and let Camel continue as of this policy didn't exist.
              log.error("Exception while resuming route", ex);
            }
            timer.cancel();
          }
        },
        redeliveryDelay);
  }

  int getRetryCount(Exchange exchange) {
    Message msg = exchange.getIn();
    return (int) msg.getHeader(JMSX_DELIVERY_COUNT, 1);
  }

  int getRedeliveryDelay(int deliveryCount) {
    // very crude backoff strategy for now, will need to refine later
    if (deliveryCount < 10) return 1000;
    if (deliveryCount < 20) return 5000;
    if (deliveryCount < 30) return 20000;
    return 60000;
  }
}

这就是它在路由定义中的使用方式:

    from(mqConnectionString)
        .routePolicy(new ExponentialBackoffPolicy())
        .transacted()
        ...

    // and if you want to distinguish between retriable and non-retriable situations, apply the following two exception handlers
    onException(NonRetriableProcessingException.class)
        .handled(true)
        .log(LoggingLevel.WARN, "Non-retriable exception occurred, discard message.");

    onException(Exception.class)
        .handled(false)
        .log(LoggingLevel.WARN, "Retriable exception occurred, retry message.");

需要注意的一点是,JMSXDeliveryCount标头来自 MQ 管理器,并由此计算重新传递延迟。当您在消息永久失败时使用策略重新启动应用程序ExponentialBackoff时,重新启动时它将立即尝试重新处理该消息,但如果发生另一个失败,则应用与重新传递总数相对应的延迟,并且不会以最初的短暂延迟重新开始.


推荐阅读