首页 > 解决方案 > 从 RabbitMQ 代理 Siddhi 作为 java 库接收消息时出错

问题描述

我正在用 Siddhi 做一个 android 应用程序。我正在使用 rabbitmq 在应用程序和服务器应用程序之间进行通信。但我无法从经纪人那里检索消息。

dependencies{
  ....
 implementation ('io.siddhi.extension.io.rabbitmq:siddhi-io-rabbitmq:3.0.6'){
        transitive = false
    }


    implementation ('io.siddhi.extension.map.json:siddhi-map-json:5.0.7'){
        transitive = false
    }

implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.0' //=> I add this line to work the siddhi map json dependency

}

当我尝试从代理检索消息时,出现以下错误:

ERROR io.siddhi.core.stream.input.source.Source  - Error on 'communicationServer'. Error in receiving the message from the RabbitMQ broker in io.siddhi.extension.map.json.sourcemapper.JsonSourceMapper@a8b0f11 Error while connecting at Source 'rabbitmq' at 'inputStream'.

 at io.siddhi.extension.io.rabbitmq.source.RabbitMQSource.connect(RabbitMQSource.java:394)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at io.siddhi.core.stream.input.source.Source.connectWithRetry(Source.java:160)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at io.siddhi.core.SiddhiAppRuntimeImpl.startSources(SiddhiAppRuntimeImpl.java:525)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at io.siddhi.core.SiddhiAppRuntimeImpl.start(SiddhiAppRuntimeImpl.java:450)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at uca.es.tfgapp.MainActivity.serverListener(MainActivity.java:200)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at uca.es.tfgapp.MainActivity.prepareDetection(MainActivity.java:517)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at uca.es.tfgapp.MainActivity.onCreate(MainActivity.java:155)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.Activity.performCreate(Activity.java:7224)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.Activity.performCreate(Activity.java:7213)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java:1272)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2926)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3081)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1831)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.os.Handler.dispatchMessage(Handler.java:106)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.os.Looper.loop(Looper.java:201)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.app.ActivityThread.main(ActivityThread.java:6810)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at java.lang.reflect.Method.invoke(Native Method)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:547)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:873)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: Caused by: android.os.NetworkOnMainThreadException
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1513)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out:     at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:389)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:230)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:212)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:436)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     at java.net.Socket.connect(Socket.java:621)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:811)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:767)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:857)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     at io.siddhi.extension.io.rabbitmq.source.RabbitMQSource.connect(RabbitMQSource.java:368)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out:     ... 21 more
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: 5561 [main] ERROR io.siddhi.core.SiddhiAppRuntimeImpl  - Error starting Siddhi App 'communicationServer', triggering shutdown process. Error on 'communicationServer'. Error in receiving the message from the RabbitMQ broker in io.siddhi.extension.map.json.sourcemapper.JsonSourceMapper@a8b0f11

这是我的 siddhi 查询:

  siddhiServer = "@app:name('communicationServer') " +
                "@source(type='rabbitmq'," +
                "uri='amqp://tfgServer:tfgServer@192.168.1.158:5672'," +
                "exchange.name='prueba'," +
                "queue.name='cola_receptor2',"+
                "@map(type='json')) " +
                "define stream inputStream(result int); " +
                "from inputStream select * insert into outputResult";

        siddhiRuntimeServer = siddhiManager.createSiddhiAppRuntime(siddhiServer);

        siddhiRuntimeServer.start();
        siddhiRuntimeServer.addCallback("outputResult", new StreamCallback() {
            @Override
            public void receive(Event[] events) {

                EventPrinter.print(events);
               
            }
        });

有人知道我在做什么错吗?

标签: javaandroidrabbitmqsiddhi

解决方案


推荐阅读