rx-java - 使用 RX java 运行一个永恒的后台进程
问题描述
我正在尝试使用需要侦听消息代理的 RX java 构建应用程序。此侦听器应始终处于启动状态,并且不得阻塞主线程。
我正在使用以下代码来创建 observable,它会在收到消息后立即发出消息。
public Observable<Message<byte[]>> getObservable(){
return Observable.create((ObservableOnSubscribe<Message<byte[]>>) subscriber -> {
if (!subscriber.isDisposed()) {
Scheduler.Worker deliveryWorker = **Schedulers.io().createWorker()**;
deliveryWorker.schedule(() -> {
Message<byte[]> receivedMessages = destination.receive();
subscriber.onNext(receivedMessages );
});
}
})
.**observeOn(Schedulers.newThread())**;
} }
现在,当我执行代码时,我看到程序运行了一秒钟然后完成。
我希望它永远运行,因为从不调用subscriber.onComplete()。 我是否遗漏了什么,需要做什么才能让这个工作人员永远运行并接收消息?
我在这里想念什么?需要做什么才能使 observable 永远启动并运行?
我在 Eclipse 中进行了测试,发现在 main 方法执行后没有线程处于活动状态;
但是,如果我使用以下代码将 observable 更改为阻塞 observable,则该进程不会退出并永远运行。
getObservable().blockingSubscribe(message->{print(message);})
doSomething(); // this code is not executing
但同时这会阻止主线程进一步执行。这不是我想要的。
什么可以解决我的问题?
解决方案
推荐阅读
- javascript - 浏览器实际上是如何将 DOM 数据存储在最低级别的?它是存储为表还是树?
- xmpp - Ejabberd - ejabberd_auth_external:failure:103 调用“check_password”时外部身份验证程序失败
- reactjs - 反应:
与数组 - python - 通过 Python sdk 将所有者添加到 Azure AD 组
- python - 为什么在 pandas 中设置列的值不起作用>
- c - 如何用特定值填充非紧凑结构的所有填充字节?
- c# - 如果我在使用中使用 null 进行初始化,我还会获得 Dispose 优势吗?
- asp.net-mvc - Angular 7 CLI 构建:如何省略模板才能使用 MVC 视图?
- python - 在 ARM64 docker 容器中编译 typed-ast
- python-3.x - getattr 在迭代 webdriver 对象时报告意外异常