multithreading - 使用 RxJava 进行线程流水线操作
问题描述
RxJava 大师,这是你大放异彩的机会!
IllegalStateException
您能否仅通过更改方法中以开头的 RxJava 管道来Flowable.generate()
确保以下程序不会抛出异常main()
?
class ExportJob {
private static Scheduler singleThread(String threadName) {
return Schedulers.from(newFixedThreadPool(1, r -> {
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
}));
}
public static void main(String[] args) {
Scheduler genSched = singleThread("genThread");
Scheduler mapSched = singleThread("mapThread");
// execute on "genThread"
Flowable.generate(ExportJob::infiniteGenerator)
.subscribeOn(genSched, false)
// execute on "mapThread"
.observeOn(mapSched, false)
.concatMapMaybe(ExportJob::mapping)
// execute on the thread that creates the pipeline, block it until finished
.blockingForEach(ExportJob::terminal);
}
private static int nb;
/** Must execute on "genThread" thread. */
private static void infiniteGenerator(Emitter<Integer> emitter) {
print(nb, "infiniteGenerator");
emitter.onNext(nb++);
checkCurrentThread("genThread");
}
/** Must execute on "mapThread" thread. */
private static Maybe<Integer> mapping(Integer s) {
print(s, "mapping");
checkCurrentThread("mapThread");
return Maybe.just(s);
}
/** Must execute on "terminal" thread. */
private static void terminal(Integer s) {
print(s, "terminal");
checkCurrentThread("main");
}
private static void print(int item, String method) {
System.out.format("%d - %s - %s()%n", item, Thread.currentThread().getName(), method);
}
private static void checkCurrentThread(String expectedThreadName) throws IllegalStateException {
String name = Thread.currentThread().getName();
if (!name.equals(expectedThreadName)) {
throw new IllegalStateException("Thread changed from '" + expectedThreadName + "' to '" + name + "'");
}
}
}
解决方案
您必须使用subscribeOn(scheduler, true)
,以便将请求也路由回其预期的线程:
Flowable.generate(ExportJob::infiniteGenerator)
.subscribeOn(genSched, true) // <------------------------------
// execute on "mapThread"
.observeOn(mapSched, false)
.concatMapMaybe(ExportJob::mapping)
.subscribeOn(mapSched, true) // <------------------------------
.blockingForEach(ExportJob::terminal);
推荐阅读
- css - 如何用钩子覆盖材质 ui 样式
- angular - Angular 6 可观察和订阅
- html - Bootstrap 折叠和手风琴向侧面打开,而不是在下面
- python - 如何在 python 的 SPARQLWrapper 中创建参数化查询
- node.js - 计算nodejs中的cpu使用率
- ios - JSONSerialize 并将 JSON 发布到 Swift 中的服务器
- angular - *ngIf = "true" 应用于表单时无法访问表单数据
- mysql - 创建一个虚拟列,汇总不同表中列的值
- flutter - 在运行时构建的通用颤振应用程序中的状态管理
- html - 有没有办法在 wordpress 中查找和编辑特定的 html 片段?