java - 从 RabbitMQ 代理 Siddhi 作为 java 库接收消息时出错
问题描述
我正在用 Siddhi 做一个 android 应用程序。我正在使用 rabbitmq 在应用程序和服务器应用程序之间进行通信。但我无法从经纪人那里检索消息。
dependencies{
....
implementation ('io.siddhi.extension.io.rabbitmq:siddhi-io-rabbitmq:3.0.6'){
transitive = false
}
implementation ('io.siddhi.extension.map.json:siddhi-map-json:5.0.7'){
transitive = false
}
implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.0' //=> I add this line to work the siddhi map json dependency
}
当我尝试从代理检索消息时,出现以下错误:
ERROR io.siddhi.core.stream.input.source.Source - Error on 'communicationServer'. Error in receiving the message from the RabbitMQ broker in io.siddhi.extension.map.json.sourcemapper.JsonSourceMapper@a8b0f11 Error while connecting at Source 'rabbitmq' at 'inputStream'.
at io.siddhi.extension.io.rabbitmq.source.RabbitMQSource.connect(RabbitMQSource.java:394)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at io.siddhi.core.stream.input.source.Source.connectWithRetry(Source.java:160)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at io.siddhi.core.SiddhiAppRuntimeImpl.startSources(SiddhiAppRuntimeImpl.java:525)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at io.siddhi.core.SiddhiAppRuntimeImpl.start(SiddhiAppRuntimeImpl.java:450)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at uca.es.tfgapp.MainActivity.serverListener(MainActivity.java:200)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at uca.es.tfgapp.MainActivity.prepareDetection(MainActivity.java:517)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at uca.es.tfgapp.MainActivity.onCreate(MainActivity.java:155)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.Activity.performCreate(Activity.java:7224)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.Activity.performCreate(Activity.java:7213)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java:1272)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2926)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3081)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1831)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.os.Handler.dispatchMessage(Handler.java:106)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.os.Looper.loop(Looper.java:201)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.app.ActivityThread.main(ActivityThread.java:6810)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at java.lang.reflect.Method.invoke(Native Method)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:547)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:873)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: Caused by: android.os.NetworkOnMainThreadException
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1513)
2020-06-24 01:47:48.575 30884-30884/uca.es.tfgapp I/System.out: at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:389)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:230)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:212)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:436)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: at java.net.Socket.connect(Socket.java:621)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:811)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:767)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:857)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: at io.siddhi.extension.io.rabbitmq.source.RabbitMQSource.connect(RabbitMQSource.java:368)
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: ... 21 more
2020-06-24 01:47:48.576 30884-30884/uca.es.tfgapp I/System.out: 5561 [main] ERROR io.siddhi.core.SiddhiAppRuntimeImpl - Error starting Siddhi App 'communicationServer', triggering shutdown process. Error on 'communicationServer'. Error in receiving the message from the RabbitMQ broker in io.siddhi.extension.map.json.sourcemapper.JsonSourceMapper@a8b0f11
这是我的 siddhi 查询:
siddhiServer = "@app:name('communicationServer') " +
"@source(type='rabbitmq'," +
"uri='amqp://tfgServer:tfgServer@192.168.1.158:5672'," +
"exchange.name='prueba'," +
"queue.name='cola_receptor2',"+
"@map(type='json')) " +
"define stream inputStream(result int); " +
"from inputStream select * insert into outputResult";
siddhiRuntimeServer = siddhiManager.createSiddhiAppRuntime(siddhiServer);
siddhiRuntimeServer.start();
siddhiRuntimeServer.addCallback("outputResult", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
}
});
有人知道我在做什么错吗?
解决方案
推荐阅读
- push-notification - 任何适用于独立 Apple Watch 应用程序的推送通知服务?
- ios - 使用解码协议解析 Json(可编码)
- python - QChart 添加轴不显示,当悬停信息不正确时?
- reactjs - 无法创建新项目..遇到错误
- ios - Xcode 11:如何垂直堆叠编辑器?
- r - R:将隐式缺失值和组填充到数据的整个时间跨度
- go - 正向代理负载均衡器/故障转移
- python - 两个单词列表之间的比较
- reactjs - 如何使用 this.props.history 重定向 reactjs
- python - with.. if else.. as 语句单行python