java - 如何在 rxJava 中不断尝试直到成功
问题描述
我想继续尝试建立 TCP 连接,并在成功时对该连接进行操作。
假设这是我的界面:
public interface ConnectionFactory {
rx.Observable<Socket> createConnection(String host, int port);
}
我想这样使用它:
ConnectionFactory factory = new ConnectionFactoryImpl();
rx.Observable<Socket> rxSocket = factory.createConnection("abc.mycomp.com", 9999);
rxSocket.subscribe(...);
我的问题是 - ConnectionFactory.create(String, int) 的实现将如何?不寻找工作代码,但指针会有所帮助。rxJava 1.x 中是否有带有重试的轮询机制?
它必须每 10 秒继续尝试连接(比如说),直到成功,然后调用subscriber.onNext(Socket)。可能会抑制 IOException/任何其他连接异常。
解决方案
您可以按如下方式实现 ConnectionFactory:
class ConnectionFactoryImpl implements ConnectionFactory {
@Override
public rx.Observable<Socket> createConnection(String host, int port) {
Single<Socket> single = Single.create(sub -> {
try {
Socket socket = null;
//create your socket
sub.onSuccess(socket);
} catch (Exception e) {
sub.onError(e);
}
});
return single.toObservable();
}
}
要每 10 秒重试一次连接,您可以执行以下操作:
ConnectionFactory factory = new ConnectionFactoryImpl();
rx.Observable<Socket> rxSocket = Observable.interval(10, TimeUnit.SECONDS)
.flatMap(tick -> factory.createConnection("abc.mycomp.com", 9999)
.map(socket -> Optional.of(socket))
.onErrorReturn(throwable -> Optional.empty())
.filter(op -> op.isPresent())
.map(op -> op.get())
)
.take(1);
rxSocket
.subscribe(socket -> {
});
请注意,默认情况下Observable.interval()
运行Schedulers.computation()
,这意味着rxSocket
异步执行。
推荐阅读
- orientdb - 尝试连接到本地 orientdb 数据库时,“配置文件格式无效”
- mongodb - MongoDB 对匹配所有值的电子邮件地址进行全文搜索,并可能修复花费太多时间
- java - 使用 Java 格式文本创建 Excel
- c++ - 用 cv::Point2f 类型重载 std::sort?
- transform - How to add collections in transformations when writing(creating) a Document in MarkLogic
- mongodb - 使用 Spring Boot 从 MongoDB 存储库中选择嵌套属性
- java - 尝试将导航抽屉按钮添加到应用栏
- java - 以编程方式静默安装 APK
- hyperledger-fabric - Channel creation fails in Hyperledger Fabric v1.2
- java - 按位置划分的不当 Google 自定义搜索结果