首页 > 解决方案 > Guava Futures.transform 与“Real” Executor 与 transformAsync 与 DirectExecutor

问题描述

假设我有两个重量级 IO 阻塞操作,findMatchInSomeDB() 和 getDetailsFromOtherDB(String objectKey)。此外,我希望它们在后台运行,并使用 Guava Futures 将它们链接在一起,因为一个依赖于另一个的结果(我知道这可以在一个简单的 Callable 中按顺序完成,但为了说明目的保持简单)

consumeChain下面的和方法之间是否有任何实际或细微的区别consumeChainAsync

import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

public class Consumer
{
   private final Retriever retriever;
   private final ListeningExecutorService executorSvc;

   public Consumer(Retriever retriever, ListeningExecutorService executorSvc)
   {
      this.retriever = retriever;
      this.executorSvc = executorSvc;
   }

   private void consumeChain(String searchCriteria) throws Exception
   {
      ListenableFuture<String> futureMatch = executorSvc.submit(
         () -> retriever.findMatchInSomeDB(searchCriteria));

      Function<String, DataObject> keyToDataObj = objectKey ->
         retriever.getDetailsFromOtherDB(objectKey);

      // using "real" executor service so transform function runs
      // in the background
      ListenableFuture<DataObject> futureDataObj = Futures.transform(
         futureMatch, keyToDataObj, executorSvc);

      // ...do some other stuff in this thread...

      // do something with futureDataObj result
      futureDataObj.get();
   }

   private void consumeChainAsync(String searchCriteria) throws Exception
   {
      ListenableFuture<String> futureMatch = executorSvc.submit(
         () -> retriever.findMatchInSomeDB(searchCriteria));

      AsyncFunction<String, DataObject> keyToDataObj = objectKey ->
      {
         return executorSvc.submit(
            () -> retriever.getDetailsFromOtherDB(objectKey));
      };

      // using DirectExecutor because the transform function
      // only creates and submits a Callable
      ListenableFuture<DataObject> futureDataObj = Futures.transformAsync(
         futureMatch, keyToDataObj, MoreExecutors.directExecutor());

      // ...do some other stuff in this thread...

      // do something with futureDataObj
      futureDataObj.get();
   }
}

据我所知,两者都将通过 运行每个重量级操作executorSvc,并且都将传播取消和/或执行失败。

似乎transformAsync(而不是仅transform与 DirectExecutor 以外的执行程序一起使用)的唯一目的是当您使用返回 ListenableFuture 而不是直接运行操作的 API 时。我错过了什么吗?

标签: javamultithreadingasynchronousguava

解决方案


似乎transformAsync(而不是仅transform与 DirectExecutor 以外的执行程序一起使用)的唯一目的是当您使用返回 ListenableFuture 而不是直接运行操作的 API 时。

我认为这就是想法。

但是,我可以想到一个稍微好一点的小区别transformAsync:如果您调用cancel(true)output Futuretransform当前不会中断正在运行的线程getDetailsFromOtherDB。相反,transformAsync将(通过调用cancel(true)ListenableFuture返回的ListeningExecutorService)。transform 应该传播中断,但是要做到这一点有一些微妙之处,在上面的链接中有所介绍。


推荐阅读