首页 > 解决方案 > Camel - 基于查询的 RoutePolicy

问题描述

我有几个在计时器上的骆驼进程和一些轮询 JMS 端点的路由。我想实现一个 RoutePolicy 来运行数据库查询以确定它是否应该继续使用该路由。可能在“onExchangeBegin”方法中,如果那是正确的位置。

我知道我可以在路由的开头进行检查,这在计时器端点中可能就足够了,但是这种方法在 JMS 轮询中不起作用,因为它会拉动 jms 有效负载。此外,我想在多条路线上共享此路线策略。

如果有人可以指出我的示例代码或方法,我将不胜感激

谢谢

标签: javaspring-bootapache-camel

解决方案


为了创建路由策略,您必须实现 RoutePolicy 接口,或使用 Camel 提供的类 org.apache.camel.support.RoutePolicySupport 以方便使用。所以你的策略类必须像

class MyRoutePolicy extends RoutePolicySupport{...}

正如您在描述中所说,onExchangeBegin在 JMS 情况下将不起作用,因为它会消耗您的消息,因此消息将丢失。

我认为可行的一种方法是禁用/启用侦听 JMS queue 的相应路由

因此,您可以从计时器开始创建一个新路由,假设每 10 秒一次,它将运行您想要的数据库查询。在这条路线中,您将应用您的策略并使用onExchangeDone. 因此,如果结果正常,则启用 JMS 路由,否则禁用它。像这样的东西:

public void onExchangeDone(Route route, Exchange exchange) {

    //check the result of the query
    ... exchange.getIn().getBody() ...

    // If result is ok (enable JMS route)
    CamelContext context = exchange.getContext();

    if(ok){
        try {
            context.startRoute("JMSRoute");
        } catch (Exception e) {
            getExceptionHandler().handleException("Exception occured during staring route ", e);
        }
    }
    // If result is not ok (disable JMS route)
    else{
        try {
            context.getInflightRepository().remove(exchange, "JMSRoute");
            context.stopRoute("JMSRoute");
        } catch (Exception e) {
            getExceptionHandler().handleException("Exception occured during stoping route ", e);
        }
    }
}

如果您想在多个路由中添加相同的路由策略,您只需在路由中添加 routePolicyRef(XML DSL) 或 routePolicy(Java DSL)。例如

<bean id="myRoutePolicy" class="com.xxx.MyRoutePolicy"/>

<route id="route1" routePolicyRef="myRoutePolicy"> ... </route>
<route id="route2" routePolicyRef="myRoutePolicy"> ... </route>
<route id="route3" routePolicyRef="myRoutePolicy"> ... </route>

或者使用 RoutePolicyFactory,在https://camel.apache.org/routepolicy.html使用 RoutePolicyFactory部分查看更多信息。


推荐阅读