java - rxjava 结合了 2 个调用和错误处理,延迟失败
问题描述
用例是,有两个数据源:
- 服务 1 - 从 source-1 获取
- 服务 2 - 从 source-2 获取
该应用程序应至少从 source-1 返回数据。如果 source-2 一切正常 - 数据将被“增强”,例如乘以 100。
服务 1 调用服务 2。
如果所有成功的用户都从服务 1 和服务 2 获取数据,如果服务 2 出现错误,则用户仅从服务 1 获取数据(至少)如果服务 1 出现错误 - 用户将收到错误。
有 hello-world-bench 代码可以模拟这种情况:
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;
class Response {
public Integer value;
public String warning;
public Response(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Response{" +
"value=" + value +
", warning='" + warning + '\'' +
'}';
}
}
class Service1 {
public Observable<Response> call(int arg) {
return Observable
.just(
new Response(1),
new Response(2),
new Response(3),
new Response(4))
.delay(100, TimeUnit.MILLISECONDS);
}
}
class Service2 {
public Observable<Response> call(int arg) {
if ( arg % 2 == 0) {
System.out.println("service 2: " + arg);
return Observable
.just(new Response(100 * arg)) // service 2 multiplies x 100 on the result it gets from the service 1
.delay(10, TimeUnit.MILLISECONDS);
} else {
System.out.println("service 2: " + arg);
return Observable.error(new RuntimeException("service 2 error"));
}
}
}
public class Step1 {
static Service1 service1 = new Service1();
static Service2 service2 = new Service2();
public static void main(String[] args) throws InterruptedException {
var oo1 = service1.call(1);
var oo3 = oo1.switchMapDelayError(x -> {
final Observable<Response> oo2 = service2.call(x.value);
return oo2
.onErrorReturn((ex) -> {
//System.out.println("Error handling..." + ex.getMessage() + " " + x);
x.warning = ex.getMessage();
return x; // returns at least service1 result
});
});
oo3.subscribe(x -> {
System.out.println(x);
});
Thread.sleep(100000);
}
}
这段代码的结果是:
service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}
问题是:没有预期:value=200
2*100
然而,如果我在 service2.call() //.delay(10, TimeUnit.MILLISECONDS) 评论延迟,那么它会得到预期的结果:
service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
Response{value=200, warning='null'}
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}
问题是:为什么.delay(10, TimeUnit.MILLISECONDS) on service2.call()
它不能产生 value=200 ?该解决方案有什么问题,我想念什么?
谢谢。
解决方案
你的问题是switchMapDelayError
运营商。您应该使用 concatMap 或 flatMap
我冒昧地为您的用例编写了一个测试。请注意,始终使用重载来提供 aScheduler
以便TestScheduler
为测试提供 a 。
switchMap 有什么作用?
在每个上游发出 switchMap 订阅给定的内部流。当上游发出一个新值时,旧的内部流被取消订阅,并且再次调用 switchMap 的 lambda 以订阅新的内部流。
问题可能是这段代码:
return Observable
.just(
new Response(1),
new Response(2),
new Response(3),
new Response(4))
.delay(100, TimeUnit.MILLISECONDS);
它几乎立即一个接一个地在堆栈上发出响应 1 到 4,并且每个发出都在另一个线程上延迟。因此响应 1 到 4 将几乎立即发出。它们不会像这样发出:响应(1)在 100 毫秒,响应(2)在 200 毫秒等。
让我们看看输出是干什么用的
Observable.just(
new Response(1), //
new Response(2),
new Response(3),
new Response(4))
.delay(100, TimeUnit.MILLISECONDS)
.subscribe(r -> {
System.out.println("received value at " + Schedulers.io().now(TimeUnit.MILLISECONDS));
});
输出
received value at 1607432032768
received value at 1607432032769
received value at 1607432032769
received value at 1607432032769
因此,所有值几乎立即发出并用 switchMap 相互覆盖。先前发出的值几乎立即被新值取消。
解决方案
使用 concatMap 或 flatMap 或更改您的测试设置以 100 毫秒的间隔发出每个值。
flatMap 只订阅每个值,默认情况下最多订阅 128 个内部流。当内部流完成时,ConcatMap 只会订阅下一个值。
测试
public class So65193002 {
@Test
void so() {
TestScheduler testScheduler = new TestScheduler();
Service1 service1 = new Service1(testScheduler);
Service2 service2 = new Service2(testScheduler);
Observable<Response> service1Call = service1.call(1);
Observable<Response> combined =
service1Call.concatMapEagerDelayError(
x -> {
return service2
.call(x.value)
.onErrorReturn(
(ex) -> {
x.warning = ex.getMessage();
return x; // returns at least service1 result
});
},
true);
TestObserver<Response> test = combined.test();
testScheduler.advanceTimeBy(1, TimeUnit.HOURS);
test.assertValueCount(4)
.assertValueAt(
0,
r -> {
assertThat(r.value).isEqualTo(1);
assertThat(r.warning).isNotEmpty();
return true;
})
.assertValueAt(
1,
r -> {
assertThat(r.value).isEqualTo(200);
assertThat(r.warning).isNull();
return true;
})
.assertValueAt(
3,
r -> {
assertThat(r.value).isEqualTo(400);
assertThat(r.warning).isNull();
return true;
});
}
}
领域
class Response {
public Integer value;
public String warning;
public Response(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Response{" + "value=" + value + ", warning='" + warning + '\'' + '}';
}
}
class Service1 {
private final Scheduler scheduler;
Service1(Scheduler scheduler) {
this.scheduler = scheduler;
}
public Observable<Response> call(int arg) {
return Observable.just(
new Response(1), //
new Response(2),
new Response(3),
new Response(4))
.delay(100, TimeUnit.MILLISECONDS, scheduler);
}
}
class Service2 {
private final Scheduler scheduler;
Service2(Scheduler scheduler) {
this.scheduler = scheduler;
}
public Observable<Response> call(int arg) {
if (arg % 2 == 0) {
return Observable.just(new Response(100 * arg)).delay(10, TimeUnit.MILLISECONDS, scheduler);
} else {
return Observable.error(new RuntimeException("service 2 error"));
}
}
}
笔记
不要使用可变对象。始终确保发出的值是不可变的,否则您会遇到麻烦。
推荐阅读
- cmake - CMake 工具链文件:有没有办法指定包含和定义的语法?
- angular - 如果没有匹配的子路由,角度路由应该会失败
- javascript - 如何使用 CSS 为与父 div 标签相关的嵌套 div 标签提供边距?
- amazon-web-services - AWS Cloud9:你能停止环境吗?
- omnet++ - 如何在静脉应用程序顶部添加路由功能?
- google-cloud-firestore - 使用存储桶恢复聊天室
- react-native - 如何在本机反应中将 Canvas 签名图像上传到 php 服务器
- python - Python csv获取原始原始数据行
- clang++ - 如何用clang++编译c++20模块头单元
- php - 无法将客户端(android + php)与在流明中创建的节点服务器连接