java - 使用 reactor-grpc 去抖动类似的请求
问题描述
为了卸载我的数据库,我想对 gRPC 服务中的类似请求进行去抖动(例如,它们共享请求的相同 id 部分),该服务服务于对延迟没有严格要求的 API。我知道如何使用 vanilla gRPC 来做到这一点,但我确定我可以使用哪种类型的 Mono API。
直接调用 db 的 API 如下所示:
public Mono<Blob> getBlob(
Mono<MyRequest> request) {
return request.
map(reader.getBlob(request.getId()));
我有一种我应该使用的感觉,delaySubscription
但它似乎不是gRPC 服务处理groupBy
的 API 的一部分。Mono
解决方案
不使用反应式运算符来检测重复项是完全可以的:
// Guava cache as example.
private final Cache<String, Boolean> duplicatesCache = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(1))
.build();
public Mono<Blob> getBlob(Mono<MyRequest> request) {
return request.map(req -> {
var id = req.getId();
var cacheKey = extractSharedIdPart(id);
if (duplicatesCache.getIfPresent(cacheKey) == null) {
duplicatesCache.put(cacheKey, true);
return reader.getBlob(id);
} else {
return POISON_PILL; // Any object that represents debounce hit.
// Or use flatMap() + Mono.error() instead.
}
});
}
如果出于某种原因您绝对想使用反应式运算符,那么首先您需要将传入的 grpc 请求转换为Flux
. 这可以使用第三方库(如salesforce/reactive-grpc)或直接实现:
class MyService extends MyServiceGrpc.MyServiceImplBase {
private FluxSink<Tuple2<MyRequest, StreamObserver<MyResponse>>> sink;
private Flux<Tuple2<MyRequest, StreamObserver<MyResponse>>> flux;
MyService() {
flux = Flux.create(sink -> this.sink = sink);
}
@Override
public void handleRequest(MyRequest request, StreamObserver<MyResponse> responseObserver) {
sink.next(Tuples.of(request, responseObserver));
}
Flux<Tuple2<MyRequest, StreamObserver<MyResponse>>> getFlux() {
return flux;
}
}
接下来,您订阅此通量并使用您喜欢的运算符:
public static void main(String[] args) {
var mySvc = new MyService();
var server = ServerBuilder.forPort(DEFAULT_PORT)
.addService(mySvc)
.build();
server.start();
mySvc.getFlux()
.groupBy(...your grouping logic...)
.flatMap(group -> {
return group.sampleTimeout(...your debounce logic...);
})
.flatMap(...your handling logic...)
.subscribe();
}
但要注意使用groupBy与许多不同的共享 id 部分:
这些组需要在下游耗尽和消耗,groupBy 才能正常工作。值得注意的是,当标准产生大量组时,如果这些组没有被下游适当地消耗(例如,由于 flatMap 的 maxConcurrency 参数设置得太低),则可能导致挂起。