首页 > 解决方案 > 如何在 Reactor 中正确调用返回未来的方法

问题描述

为了防止 XY 问题,我将从头开始:
我有一个非阻塞 SOAP 客户端,我将其包装为返回类型Mono<T>(默认情况下它接受回调。如果需要,我可以详细说明)。

现在我想做(给定 ID):
1. 通过 ID 获取代码
2. 对代码做一些事情
3. 之后,获取FooBar创建FooBar
我写的是:

public class MyService {

    private final MySoapClient soapClient;

    public Mono<FooBarDto> doSomething(String id) {
        return Mono.just(id)
                .flatMap(soapClient::getCode) // returns Mono<String>
                .flatMap(code ->
                        soapClient.doSomething(code) // returns Mono<Void>
                                .then(getFooBar(id, code))); // See this
    }

    private Mono<FooBarDto> getFooBar(String id, String code) {
        return Mono.zip(
                soapClient.getFoo(code), // returns Mono<Foo>
                soapClient.getBar(code) // returns Mono<Bar>
        ).map(tuple2 -> toFooBarDto(id, tuple2));
    }

    private FooBarDto toFooBarDto(String id, Tuple2<Foo, Bar> tuple2) {
        return FooBarDto.builder()/* set properties */.build();
    }

}

现在的问题是,因为 SOAP 客户端的方法不是惰性的(在您调用它们的那一刻,它们开始了进程),所以then这里的语义不起作用。意思是我想得到什么Foo时候Bar完成doSomething。他们都是一起开始的。
我试图通过更改then为来修复它flatMap,但让它变得更糟。getFooBar从来没有被叫过。(1.有人可以解释为什么吗?)。

所以我最终做的是再次包装 SOAP 调用以使它们变得懒惰:

public class MySoapClient {
    private final AutoGeneratedSoapClient client;

    Mono<Foo> getFoo(GetFooRequest request) {
        return Mono.just(request).flatMap(this::doGetMsisdnByIccid);
    }

    private Mono<Foo> doGetFoo(GetFooRequest request) {
        val handler = new AsyncHandler<GetFooRequest>();
        client.getFoo(request, handler);
        return Mono.fromFuture(handler.future);
    }

    private static class AsyncHandler<T> implements javax.xml.ws.AsyncHandler<T> {
        private final CompletableFuture<T> future = new CompletableFuture<>();

        @Override
        public void handleResponse(Response<T> res) {
            try {
                future.complete(res.get());
            } catch (Exception e) {
                future.completeExceptionally(e);
            }
        }
    }
}

有没有更好的方法呢?具体来说:
2. 使用CompeletableFuture和回调。
3. 使 SOAP 客户端中的方法变得惰性。

标签: project-reactor

解决方案


我试图通过更改它来修复它,然后更改为 flatMap,但让它变得更糟。getFooBar 从未被调用过。(1.有人可以解释为什么吗?)

我认为 aMono<Void>总是完成空(或错误),因此永远不会调用后续的 flatMap 。

  1. 使用 CompletableFuture 和回调。
  2. 在 SOAP 客户端中使方法变得惰性。

要使呼叫变得懒惰,您可以执行以下操作之一:

1、您可以使用Mono.fromFuture哪个接受供应商:

private Mono<Foo> doGetFoo(GetFooRequest request) {
    return Mono.fromFuture(() -> {
        val handler = new AsyncHandler<GetFooRequest>();
        client.getFoo(request, handler);
        return handler.future;
    });
}

2,您可以使用Mono.defer

private Mono<Foo> doGetFoo(GetFooRequest request) {
    return Mono.defer(() -> {
        val handler = new AsyncHandler<GetFooRequest>();
        client.getFoo(request, handler);
        return Mono.fromFuture(handler.future);
    });
}

3,您可以摆脱 CompletableFuture 并Mono.create改用,如下所示:

private Mono<Foo> doGetFoo(GetFooRequest request) {
    return Mono.create(sink -> {
        AsyncHandler<Foo> handler = response ->
        {
            try
            {
                sink.success(response.get());
            } catch (Exception e)
            {
                sink.error(e);
            }
        };

        client.getFoo(request, handler);
    });
}

如果您执行其中任何一项操作,那么使用then方法将是安全的,并且它将按预期工作。


推荐阅读