首页 > 解决方案 > Vert.x Kafka 不遵守 RxJava 线程分配?

问题描述

给定以下代码:

kafkaConsumer
  .rxSubscription()
  .subscribeOn(Schedulers.io())
  .map(s -> {
    logger.info("Mapping on Thread: " + Thread.currentThread().getName());
    return s;
   })
  .observeOn(Schedulers.computation())
  .subscribe(
     set -> {
       logger.info("Subscribing on Thread: " +Thread.currentThread().getName());
   });

其中 kafkaConsumer 是 Vert.x KafkaConsumer,我希望

.map(s -> {
  logger.info("Mapping on Thread: " + Thread.currentThread().getName());
  return s;
})

将发生在 Reactive IO 线程上。但是,它在 Vert.x 事件循环线程上执行。当我运行以下测试类时,相同的场景按预期在 IO 线程上运行 map 方法。

public class ThreadTesting {

public static void main(String args[]) {
  Vertx vertx = Vertx.vertx();
  Observable.fromArray(new String[] {"start"})
    .flatMapSingle(s -> method1())
    .subscribeOn(Schedulers.io())
    .map(
        s -> {
          System.out.println("mapping 2 on Thread: " + Thread.currentThread().getName());
          return s.concat(method2());
        })
    .observeOn(Schedulers.computation())
    .subscribe(
        str -> {
          System.out.println("Subscribing on Thread: " + Thread.currentThread().getName());
        },
        onError -> {
          onError.printStackTrace();
        });
 }

 public static Single<String> method1() {
   System.out.println("Executing method 1 on Thread: " + Thread.currentThread().getName());
   AsyncResultSingle<String> vertxSingle = new AsyncResultSingle<>(
      h -> {
         h.handle(Future.succeededFuture("method 1 string"));
      });
    return vertxSingle;
 }

 public static String method2() {
   System.out.println("Executing method 2 on Thread: " + Thread.currentThread().getName());
   return "method 2 String";
 }
}

是什么导致线程执行中的这种差异发生?

标签: rx-java2vert.xreactivex

解决方案


Vert.xKafkaConsumer在事件循环线程上异步发出项目,即使您在io调度程序上订阅了它。

在您的代码段中,您尝试强制在computation调度程序上发出项目。它有效,但不适用于您期望的可观察对象:它适用于操作返回的可观察对象map

如果要mapcomputation调度器进行操作,需要先应用操作observeOn符:

kafkaConsumer
  .rxSubscription()
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.computation())
  .map(s -> {
    logger.info("Mapping on Thread: " + Thread.currentThread().getName());
    return s;
   })
  .subscribe(
     set -> {
       logger.info("Subscribing on Thread: " +Thread.currentThread().getName());
   });

推荐阅读