首页 > 解决方案 > 使用 RX java 运行一个永恒的后台进程

问题描述

我正在尝试使用需要侦听消息代理的 RX java 构建应用程序。此侦听器应始终处于启动状态,并且不得阻塞主线程。

我正在使用以下代码来创建 observable,它会在收到消息后立即发出消息。

public Observable<Message<byte[]>>  getObservable(){ 
return Observable.create((ObservableOnSubscribe<Message<byte[]>>) subscriber -> {
        if (!subscriber.isDisposed()) { 
                Scheduler.Worker deliveryWorker = **Schedulers.io().createWorker()**;  
                deliveryWorker.schedule(() -> {
                Message<byte[]>  receivedMessages = destination.receive();
              subscriber.onNext(receivedMessages );
                  });

        }

    })   
 .**observeOn(Schedulers.newThread())**;
} }

现在,当我执行代码时,我看到程序运行了一秒钟然后完成。

我希望它永远运行,因为从不调用subscriber.onComplete()。 我是否遗漏了什么,需要做什么才能让这个工作人员永远运行并接收消息?

我在这里想念什么?需要做什么才能使 observable 永远启动并运行?

我在 Eclipse 中进行了测试,发现在 main 方法执行后没有线程处于活动状态;

但是,如果我使用以下代码将 observable 更改为阻塞 observable,则该进程不会退出并永远运行。

 getObservable().blockingSubscribe(message->{print(message);})
 doSomething(); // this code is not executing 

但同时这会阻止主线程进一步执行。这不是我想要的。

什么可以解决我的问题?

标签: rx-javarx-java2rx-androidreactive-streams

解决方案


推荐阅读