rx-java2 - Vert.x Kafka 不遵守 RxJava 线程分配?
问题描述
给定以下代码:
kafkaConsumer
.rxSubscription()
.subscribeOn(Schedulers.io())
.map(s -> {
logger.info("Mapping on Thread: " + Thread.currentThread().getName());
return s;
})
.observeOn(Schedulers.computation())
.subscribe(
set -> {
logger.info("Subscribing on Thread: " +Thread.currentThread().getName());
});
其中 kafkaConsumer 是 Vert.x KafkaConsumer,我希望
.map(s -> {
logger.info("Mapping on Thread: " + Thread.currentThread().getName());
return s;
})
将发生在 Reactive IO 线程上。但是,它在 Vert.x 事件循环线程上执行。当我运行以下测试类时,相同的场景按预期在 IO 线程上运行 map 方法。
public class ThreadTesting {
public static void main(String args[]) {
Vertx vertx = Vertx.vertx();
Observable.fromArray(new String[] {"start"})
.flatMapSingle(s -> method1())
.subscribeOn(Schedulers.io())
.map(
s -> {
System.out.println("mapping 2 on Thread: " + Thread.currentThread().getName());
return s.concat(method2());
})
.observeOn(Schedulers.computation())
.subscribe(
str -> {
System.out.println("Subscribing on Thread: " + Thread.currentThread().getName());
},
onError -> {
onError.printStackTrace();
});
}
public static Single<String> method1() {
System.out.println("Executing method 1 on Thread: " + Thread.currentThread().getName());
AsyncResultSingle<String> vertxSingle = new AsyncResultSingle<>(
h -> {
h.handle(Future.succeededFuture("method 1 string"));
});
return vertxSingle;
}
public static String method2() {
System.out.println("Executing method 2 on Thread: " + Thread.currentThread().getName());
return "method 2 String";
}
}
是什么导致线程执行中的这种差异发生?
解决方案
Vert.xKafkaConsumer
在事件循环线程上异步发出项目,即使您在io
调度程序上订阅了它。
在您的代码段中,您尝试强制在computation
调度程序上发出项目。它有效,但不适用于您期望的可观察对象:它适用于操作返回的可观察对象map
。
如果要map
对computation
调度器进行操作,需要先应用操作observeOn
符:
kafkaConsumer
.rxSubscription()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(s -> {
logger.info("Mapping on Thread: " + Thread.currentThread().getName());
return s;
})
.subscribe(
set -> {
logger.info("Subscribing on Thread: " +Thread.currentThread().getName());
});
推荐阅读
- reactjs - 在同一级别的 key-prop-problem 上反应多个映射
- r - 使用变量对 data.table 进行子集化(当 varname 与 colname 相同时)
- html - 如何使子div高度与父div高度相同
- perl - 使用 perl 脚本在目录中搜索文本模式
- java - 找不到 ComboPooledDataSource 的类型
- docker - Error During Bazel Build of Tensorflow (Serving tutorial): Ubuntu
- fxml - 如何将 fxml 文件绑定到 scalafx 中的自定义对话框中?
- ruby-on-rails - Rails 5 中使用 or 方法的动态作用域
- django - 在 Django 中使用现有用户创建从用户继承的对象
- android - How @OnLifecycleEvent annotated methods from Architecture Components get hooked up with the LifecycleOwner?