首页 > 解决方案 > 如何在 rxJava 中不断尝试直到成功

问题描述

我想继续尝试建立 TCP 连接,并在成功时对该连接进行操作。

假设这是我的界面:

public interface ConnectionFactory {

    rx.Observable<Socket> createConnection(String host, int port);

}

我想这样使用它:

ConnectionFactory factory = new ConnectionFactoryImpl();

rx.Observable<Socket> rxSocket = factory.createConnection("abc.mycomp.com", 9999);

rxSocket.subscribe(...);

我的问题是 - ConnectionFactory.create(String, int) 的实现将如何?不寻找工作代码,但指针会有所帮助。rxJava 1.x 中是否有带有重试的轮询机制?

它必须每 10 秒继续尝试连接(比如说),直到成功,然后调用subscriber.onNext(Socket)。可能会抑制 IOException/任何其他连接异常。

标签: javarx-java

解决方案


您可以按如下方式实现 ConnectionFactory:

class ConnectionFactoryImpl implements ConnectionFactory {

    @Override
    public rx.Observable<Socket> createConnection(String host, int port) {
        Single<Socket> single = Single.create(sub -> {
            try {
             Socket socket = null;
             //create your socket
             sub.onSuccess(socket); 
         } catch (Exception e) {
             sub.onError(e);
         }
         });
         return single.toObservable();
    }
}

要每 10 秒重试一次连接,您可以执行以下操作:

ConnectionFactory factory = new ConnectionFactoryImpl();

rx.Observable<Socket> rxSocket = Observable.interval(10, TimeUnit.SECONDS)
    .flatMap(tick -> factory.createConnection("abc.mycomp.com", 9999)
                    .map(socket -> Optional.of(socket))
                    .onErrorReturn(throwable -> Optional.empty())
                    .filter(op -> op.isPresent())
                    .map(op -> op.get())
            )
    .take(1);

rxSocket
    .subscribe(socket -> {

    });

请注意,默认情况下Observable.interval()运行Schedulers.computation(),这意味着rxSocket异步执行。


推荐阅读