java - 在 RxJava 中拉取实现
问题描述
我正在开发一个测试库,我想使用 RxJava 实现一个拉取实现
我有一个流(所有可用的测试),订阅了几个观察者(所有测试设备),我希望当每个订阅者完成一个元素的处理时,向流请求一个新元素,并且所有流元素都应该只处理一个消费者。
我想知道这是否可以使用热可观察和使用背压技术来实现,但我不太确定这一点:(
有任何想法吗?
解决方案
最后,在@akarnokd 的帮助下,我找到了解决方案。这是代码:
Subscriber sub1=new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
System.out.println("onSubscribe done");
}
@Override
public void onNext(Integer t) {
System.out.println("Sub 1 Processing: "+t);
sleep(1000);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onComplete() { }
};
Subscriber sub2=new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
System.out.println("onSubscribe done");
}
@Override
public void onNext(Integer t) {
System.out.println("Sub2 Processing: "+t);
sleep(500);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onComplete() { }
};
// ***** Magic happens here!! *****
DispatchWorkProcessor<Integer> dwp = DispatchWorkProcessor.create(Schedulers.io());
Flowable.range(1, 20).subscribe(dwp);
dwp.subscribe(sub1);
dwp.subscribe(sub2);
// ********************************
sleep(Integer.MAX_VALUE);
}
诀窍是使用request(1)
来实现拉取方法并DispatchWorkProcessor
从流中消耗元素一次。尽管一个消费者的速度是预期的两倍,但输出是预期的:
onSubscribe done
onSubscribe done
Sub 1 Processing: 1
Sub2 Processing: 2
Sub2 Processing: 3
Sub 1 Processing: 4
Sub2 Processing: 5
Sub2 Processing: 6
Sub 1 Processing: 7
Sub2 Processing: 8
Sub2 Processing: 9
Sub 1 Processing: 10
Sub2 Processing: 11
Sub2 Processing: 12
Sub 1 Processing: 13
Sub2 Processing: 14
Sub2 Processing: 15
Sub 1 Processing: 16
Sub2 Processing: 17
Sub2 Processing: 18
Sub 1 Processing: 19
Sub2 Processing: 20
所有的流元素都处理一次,Sub 2 处理双倍的元素,这很好!
推荐阅读
- java - 如何在一个命令中编译一堆“.java”文件?
- python - how to stop this loop from repeating
- java - 从数组中提取json数据
- angular - “找不到模块'@schematics/angular/utility/config'
- android - kotlin-reflect 库导入问题
- autohotkey - 尝试修复当我按两次键时执行功能的自动热键代码
- python - Python:为什么打开 XFA pdf 文件比相同大小的 txt 文件花费更长的时间?
- embedded - 将旋转编码器与 HID 设备 (STM32) 连接
- nginx - Nginx + Gunicorn:没有根位置无法访问 Flask 应用程序
- mysql - Python MySQL 读错 Timestamp(3) 的数据