首页 > 解决方案 > 使用 reactor-grpc 去抖动类似的请求

问题描述

为了卸载我的数据库,我想对 gRPC 服务中的类似请求进行去抖动(例如,它们共享请求的相同 id 部分),该服务服务于对延迟没有严格要求的 API。我知道如何使用 vanilla gRPC 来做到这一点,但我确定我可以使用哪种类型的 Mono API。

直接调用 db 的 API 如下所示:

public Mono<Blob> getBlob(
      Mono<MyRequest> request) {
    return request.
       map(reader.getBlob(request.getId()));

我有一种我应该使用的感觉,delaySubscription但它似乎不是gRPC 服务处理groupBy的 API 的一部分。Mono

标签: javagrpcproject-reactor

解决方案


不使用反应式运算符来检测重复项是完全可以的:

// Guava cache as example.
private final Cache<String, Boolean> duplicatesCache = CacheBuilder.newBuilder()
    .expireAfterWrite(Duration.ofMinutes(1))
    .build();

public Mono<Blob> getBlob(Mono<MyRequest> request) {
    return request.map(req -> {
        var id = req.getId();
        var cacheKey = extractSharedIdPart(id);
        if (duplicatesCache.getIfPresent(cacheKey) == null) {
            duplicatesCache.put(cacheKey, true);
            return reader.getBlob(id);
        } else {
            return POISON_PILL; // Any object that represents debounce hit.
                                // Or use flatMap() + Mono.error() instead.
        }
    });
}

如果出于某种原因您绝对想使用反应式运算符,那么首先您需要将传入的 grpc 请求转换为Flux. 这可以使用第三方库(如salesforce/reactive-grpc)或直接实现:


class MyService extends MyServiceGrpc.MyServiceImplBase {

    private FluxSink<Tuple2<MyRequest, StreamObserver<MyResponse>>> sink;

    private Flux<Tuple2<MyRequest, StreamObserver<MyResponse>>> flux;

    MyService() {
        flux = Flux.create(sink -> this.sink = sink);
    }


    @Override
    public void handleRequest(MyRequest request, StreamObserver<MyResponse> responseObserver) {
        sink.next(Tuples.of(request, responseObserver));
    }

    Flux<Tuple2<MyRequest, StreamObserver<MyResponse>>> getFlux() {
        return flux;
    }
}

接下来,您订阅此通量并使用您喜欢的运算符:

public static void main(String[] args) {
    var mySvc = new MyService();
    var server = ServerBuilder.forPort(DEFAULT_PORT)
        .addService(mySvc)
        .build();
    server.start();
    mySvc.getFlux()
        .groupBy(...your grouping logic...)
        .flatMap(group -> {
            return group.sampleTimeout(...your debounce logic...);
        })
        .flatMap(...your handling logic...)
        .subscribe();
}

但要注意使用groupBy与许多不同的共享 id 部分:

这些组需要在下游耗尽和消耗,groupBy 才能正常工作。值得注意的是,当标准产生大量组时,如果这些组没有被下游适当地消耗(例如,由于 flatMap 的 maxConcurrency 参数设置得太低),则可能导致挂起。


推荐阅读