首页 > 解决方案 > 如何为 Webflux 应用程序包装阻塞 IO 操作

问题描述

我有一个 Spring Boot/Webflux 应用程序。我需要一个异步 REST 端点,它可以:

  1. 生成随机作业 ID。
  2. 通过 HTTP 调用一项服务。
  3. 通过 HTTP 调用另一个服务。
  4. 合并来自服务的响应并将结果写入文件。
  5. 将作业 ID 返回给客户端。

端点应该是异步的。这就是为什么客户不应该等待步骤的结果:2,3,4。客户端应立即收到作业 ID。

现在我有以下实现:

@Override
  public Mono<String> saveData() {
    String jobId = UUID.randomUUID().toString();
    Mono<ResponseFromService1> response1 = service1.getData();
    Mono<ResponseFromService2> response2 = service2.getData();
    return fileService.saveData(response1, response2)
        .map(filePath -> log.info("File has been stored at {}", filePath))
        .map(jobId);

service1 和 service2 是使用响应式 WebClient 实现的。fileService.saveData 的实现如下所示:

  public Mono<Path> saveDataInFile(Mono<ResponseFromService1> response1,Mono<ResponseFromService2> response2) {
return Mono.fromCallable(() ->
    Mono.zip(response1, response2)
        .map(tuple -> blockingIOsaveMethod(tuple.getT1(), tuple.getT2()))
).publishOn(Schedulers.elastic())
    .flatMap(mono -> mono);
  }

问题是这个端点不是异步的。保存数据文件后,端点的客户端获取作业 ID。我应该如何更新 saveDataInFile 和 saveData 以立即返回作业 ID?

标签: javareactive-programmingspring-webfluxproject-reactornonblocking

解决方案


客户端应立即收到作业 ID。

这似乎表明Mono<String>不是正确的返回类型saveData(),因为您显然不想等待任何异步操作完成:

@Override
public String saveData() {
  String jobId = UUID.randomUUID().toString();
  // ...
  return jobId;
}

看起来像“即...发即弃”的操作,您也许可以手动订阅Mono

@Override
public String saveData() {
  String jobId = UUID.randomUUID().toString();
  fileService.saveData(...).subscribe(...); // look at the different overloads of #subscribe(...)
  return jobId;
}

确保所有内容都记录良好,这样您就不会忘记返回 HTTP 响应后发生的情况。


推荐阅读