首页 > 解决方案 > 然后和最后使用 Flowable 反应式 x Java

问题描述

尝试使用 Flowable,然后使用 RxJava3。

public String post(Publisher<CompletedFileUpload> files) {
    return Flowable.fromPublisher(files).doOnNext(file -> {
        MultipartBody requestBody = MultipartBody.builder()
                .addPart("file", file.getFilename(), MediaType.MULTIPART_FORM_DATA_TYPE, file.getBytes())
                .addPart("id", "asdasdsds")
                .build();
    }).doOnComplete((value) -> {
        return this.iProduct.post(requestBody);
    });
}

上面的代码有错误,但我想要实现的在下面的场景中描述

  1. 迭代文件
  2. 将 file.getFilename() 和字节添加到 requestBody
  3. 然后调用返回字符串的 this.iProduct.post(requestBody)
  4. 最后返回字符串值

标签: javarx-javarx-java2reactivexrx-java3

解决方案


解决此问题的一种方法是:

  1. 收集运营商可能产生Publisher<CompletedFileUpload> files的所有排放物toList()
  2. 通过使用运算符循环在步骤 1 中创建的列表来构造请求map()
  3. 发布请求并返回结果字符串(也使用map()operator.

这个脚手架看起来像这样:

public String post(Publisher<CompletedFileUpload> files) {
    final Single<MultipartBody> requestSingle = 
        Flowable.fromPublisher(files)
            .toList()
            .map(list -> {
                final MultipartBody.Builder builder = MultipartBody.Builder();

                for(file : list) {
                    builder.addPart(...)
                }

                return builder.build();
            })
            .map(requestBody -> this.iProduct.post(requestBody));

    return requestSingle.blockingGet();
}

这里有两点值得注意:

  • toList()运算符将转换FlowableSingle
  • 您的示例混合了异步代码(所有 Rx 内容)和同步代码(post 方法返回 aString而不是延迟操作/值)。Rx 运算符是从一种反应类型转换为另一种反应类型的有用方法,但在您的情况下,您需要一种通过调用这些异步操作并等待结果值来桥接到同步世界的方法。这就是最后调用的原因blockingGet()

推荐阅读