首页 > 解决方案 > 如何暂停消息处理一段时间?

问题描述

我们在 Talend ESB Studio v6.4 中使用 Apache Camel

在 ESB 路由中,我们使用 JMS 消息,处理它们然后将它们发送到 HTTP 服务器。但是该目标服务器每周六下午 6 点到 10 点都会停机进行维护。

在此期间,我们如何“暂停”消息消费或消息处理?我认为石英仅适用于文件/ftp 端点。如果我们处于停机期间,我们可以使用处理器组件来检查 Java,但是在那之后该怎么办?

标签: esbtalendpause

解决方案


有几种方法可以做到这一点。一种骆驼特有的方法是通过 CamelControlBus。它接受一个 routeId 并对其执行操作(启动/停止/恢复等) - 在此处阅读更多内容以了解Camel ControlBus

但是,您可以采取另一种方法。您可以创建一个具有 3 个方法的 POJO bean

  • shouldRouteStop() :检查当前时间并决定是否应该停止您的路线。
  • startRoute() :如果路由被挂起,则启动它
  • stopRoute() :如果已启动,则暂停路由

一个简单的实现可以如下:

public class ManagementBean {
 public boolean shouldRouteStop() {
    // Mocking the decision here
    return new Random().nextBoolean();
 }
 public void startRoute(org.apache.camel.CamelContext ctx) throws Exception {
    if (ctx.getRouteStatus("GenerateInvoices") == ServiceStatus.Suspended)
        // replace the argument with your route Id
        ctx.resumeRoute("GenerateInvoices");
 }
 public void stopRoute(org.apache.camel.CamelContext ctx) throws Exception {
    if (ctx.getRouteStatus("GenerateInvoices") == ServiceStatus.Started)
        // replace the argument with your route Id
        ctx.suspendRoute("GenerateInvoices");
 }
}

确保您希望控制的 jms-route 具有 routeId 并将此 bean 添加到您的基本/默认 CamelContext 中,如下所示

main.bind("manageRouteBean", new ManagementBean());

创建另一个基于计时器的路由,检查每个滴答声,是否应该停止路由,然后通过 routeId 暂停或恢复路由。这条路线可以像下面这样实现:

public class MonitoringRoute extends RouteBuilder {
 @Override
 public void configure() throws Exception {
    onException(Exception.class).log(exceptionMessage().toString());

    from("timer:time?period=10000")
            .choice()
            .when().simple("${bean:manageRouteBean?method=shouldRouteStop}")
            .log("Route Should Stop")
            .bean(ManagementBean.class, "stopRoute(*)")
            .otherwise()
            .log("Route Should Start")
            .bean(ManagementBean.class, "startRoute(*)")
            .end();
 }
}

请注意,startRoute并将stopRoute参数作为 *。这是基于类型自动绑定参数的骆驼方式。

最后,您可以将此路线添加到主骆驼上下文中,例如:main.addRouteBuilder(new MonitoringRoute());

对于一个完整的实现,看看这个github repo


推荐阅读