首页 > 解决方案 > MQTT Apache Camel 路由中的错误导致应用程序停止

问题描述

我正在使用带有 xml 的Apache Camel 和 Spring(不是 Spring Boot)。

我有我的 camel-context.xml 配置文件,其中包含从 MQTT 服务器到 JMS 服务器的示例路由,反之亦然,只是为了发送消息。

这是我的骆驼上下文.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="quadtreeProcessor" class="es.gateway.router.QuadtreeProcessor" />

    <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
        <route id="quadTreeConsumerProducer">
            <from uri="jms:topic:T_ETSI_PRODUCER"/>
            <process ref="quadtreeProcessor"/>
            <to uri="mqtt:quadtree?host=tcp://localhost:1883&amp;publishTopicName=${header.publishTopicName}"/>
        </route>

        <route id="quadTreeConsumerRoute">
            <from uri="mqtt:quadtree?host=tcp://localhost:1883&amp;subscribeTopicName=CONSUMER/DENM/#"/>
            <to uri="jms:topic:T_ETSI_CONSUMER"/>
        </route>
    </camelContext>

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:10011"/>
    </bean>
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" />

这是我的 Main.class:

package es.conncar.main;

import org.apache.camel.RuntimeCamelException;
import org.apache.camel.main.MainListener;
import org.apache.camel.main.MainListenerSupport;
import org.apache.camel.main.MainSupport;
import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import es.gateway.config.Config;

public class Main {

    final static Logger logger = Logger.getLogger(Main.class);

    private static boolean exit = false;

    private org.apache.camel.spring.Main camelMain;

    public static void main(String[] args) throws Exception {
        try {
            ApplicationContext springContext = new ClassPathXmlApplicationContext("app-context.xml");
            logger.info("Spring context initialized");

            Config config = (Config) springContext.getBean("config");

            Main main = new Main();
            main.boot();
        } catch (Exception e) {
            logger.error("Unknown error in main", e);
        }
    }

    public void boot() throws Exception {
        camelMain = new org.apache.camel.spring.Main();
        camelMain.addMainListener((MainListener) new Events());
        camelMain.setApplicationContextUri("camel-context.xml"); 
        logger.info("Starting Camel. Use ctrl + c to terminate the JVM.\n");
        camelMain.run();
    }

    public static class Events extends MainListenerSupport {

        @Override
        public void afterStart(MainSupport main) {
            logger.info("Camel is now started!");
        }

        @Override
        public void beforeStop(MainSupport main) {
            logger.info("Camel is now being stopped!");
        }
    }
}

在我的 Main 类中运行相同的路由和配置时会出现问题,由于RuntimeCamelException几秒钟后就死了。

异常跟踪是:

骆驼开始正常

[30/11/2018 08:37:08][INFO ][es.gateway.main.Main] - Spring context initialized
[30/11/2018 08:37:08][INFO ][es.gateway.main.Main] - Starting Camel. Use ctrl + c to terminate the JVM. 

10 秒后

[30/11/2018 08:37:21][ERROR][es.gateway.main.Main] - Unknown error in Camel
    org.apache.camel.RuntimeCamelException: java.util.concurrent.TimeoutException
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1830)
        at org.apache.camel.spring.SpringCamelContext.start(SpringCamelContext.java:136)
        at org.apache.camel.spring.CamelContextFactoryBean.start(CamelContextFactoryBean.java:369)
        at org.apache.camel.spring.CamelContextFactoryBean.onApplicationEvent(CamelContextFactoryBean.java:416)
        at org.apache.camel.spring.CamelContextFactoryBean.onApplicationEvent(CamelContextFactoryBean.java:94)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:393)
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:347)
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:883)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:93)
        at org.apache.camel.spring.Main.createDefaultApplicationContext(Main.java:222)
        at org.apache.camel.spring.Main.doStart(Main.java:154)
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
        at org.apache.camel.main.MainSupport.run(MainSupport.java:170)
        at es.gateway.main.Main.boot(Main.java:105)
        at es.gateway.main.Main.main(Main.java:77)
    Caused by: java.util.concurrent.TimeoutException
        at org.fusesource.mqtt.client.Promise.await(Promise.java:83)
        at org.apache.camel.component.mqtt.MQTTEndpoint.connect(MQTTEndpoint.java:348)
        at org.apache.camel.component.mqtt.MQTTConsumer.doStart(MQTTConsumer.java:38)
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
        at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:3705)
        at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRouteConsumers(DefaultCamelContext.java:4023)
        at org.apache.camel.impl.DefaultCamelContext.doStartRouteConsumers(DefaultCamelContext.java:3958)
        at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3878)
        at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3642)
        at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3494)
        at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:209)
        at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3253)
        at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3249)
        at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3272)
        at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3249)
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3165)
        at org.apache.camel.spring.SpringCamelContext.start(SpringCamelContext.java:133)
        ... 18 more

当无法连接到 MQTT 代理时,看起来路由会引发超时(没关系),但是这个异常没有在路由中捕获,而是在上下文中 - 主类(这不好)。

我遵循了这本食谱:

http://camel.apache.org/running-camel-standalone-and-have-it-keep-running.html

org.apache.camel.spring.Main类中使用骆驼弹簧 JAR 。

我还检查了 Apache Camel in Action 一书中的第 13 章,但我没有找到解决方案。看来我正在启动和配置骆驼上下文就好了。

有人有这个经验吗?当路由中发生 RuntimeExceptions 时,有没有办法让路由工作和主程序保持活动状态?但愿如此!

提前谢谢!

编辑:我发现了几个谈论这个的话题。解决方案似乎是激活监督控制器,但我找不到没有 Spring Boot 的方法(我使用的是带有混合 xml 和注释配置的普通 Spring)。有人可以帮忙吗?

即使无法访问 MQTT 服务器如何启动骆驼

当activemq连接丢失时,Spring Context关闭骆驼路由

EDIT2:我已经检查过问题仅出在 MQTT上。JMS 连接正常工作,即使在 JMS 代理关闭并优雅地处理重新连接时也能够继续,而 MQTT 连接则不能。

EDIT3:Paho 工作正常,但普通 MQTT 不行。我已经在 xml config (uri) 中使用 Paho 测试了相同的代码,它的行为符合预期,类似于 JMS。该路由会引发异常但继续运行,它不会将异常引发到导致应用程序停止的上下文中。也许我在 MQTT 客户端中缺少选项?

标签: springapache-cameljmsmqtt

解决方案


我建议使用 camel-paho,因为 Eclipse Paho 比 camel-mqtt 使用的旧 FuseSource MQTT 客户端库更活跃。

也就是说,camel-mqtt 组件的问题在于它在启动时需要一个工作连接。您可以配置其重新连接选项以设置各种延迟等。

还有一种替代的路由启动机制,它允许使用一个后台线程来启动路由,该线程监控路由并可以处理重试等。SupervisingRouteController注意,围绕它的 JMX 管理功能还有一些事情需要实现,但除此之外应该是美好的。而且它缺乏更适当的文档。我们正在考虑使其在 Camel 3 中更加突出或作为默认设置。

这里有一个例子:https ://github.com/apache/camel/tree/master/examples/camel-example-spring-boot-supervising-route-controller


推荐阅读