java - 然后和最后使用 Flowable 反应式 x Java
问题描述
尝试使用 Flowable,然后使用 RxJava3。
public String post(Publisher<CompletedFileUpload> files) {
return Flowable.fromPublisher(files).doOnNext(file -> {
MultipartBody requestBody = MultipartBody.builder()
.addPart("file", file.getFilename(), MediaType.MULTIPART_FORM_DATA_TYPE, file.getBytes())
.addPart("id", "asdasdsds")
.build();
}).doOnComplete((value) -> {
return this.iProduct.post(requestBody);
});
}
上面的代码有错误,但我想要实现的在下面的场景中描述
- 迭代文件
- 将 file.getFilename() 和字节添加到 requestBody
- 然后调用返回字符串的 this.iProduct.post(requestBody)
- 最后返回字符串值
解决方案
解决此问题的一种方法是:
- 收集运营商可能产生
Publisher<CompletedFileUpload> files
的所有排放物toList()
- 通过使用运算符循环在步骤 1 中创建的列表来构造请求
map()
。 - 发布请求并返回结果字符串(也使用
map()
operator.
这个脚手架看起来像这样:
public String post(Publisher<CompletedFileUpload> files) {
final Single<MultipartBody> requestSingle =
Flowable.fromPublisher(files)
.toList()
.map(list -> {
final MultipartBody.Builder builder = MultipartBody.Builder();
for(file : list) {
builder.addPart(...)
}
return builder.build();
})
.map(requestBody -> this.iProduct.post(requestBody));
return requestSingle.blockingGet();
}
这里有两点值得注意:
toList()
运算符将转换Flowable
为Single
。- 您的示例混合了异步代码(所有 Rx 内容)和同步代码(post 方法返回 a
String
而不是延迟操作/值)。Rx 运算符是从一种反应类型转换为另一种反应类型的有用方法,但在您的情况下,您需要一种通过调用这些异步操作并等待结果值来桥接到同步世界的方法。这就是最后调用的原因blockingGet()
。
推荐阅读
- python - 在 Python 中遍历 YAML
- javascript - React.JS - 根据另一个输入更改输入
- mongodb - 我收到此错误:错误 [ERR_HTTP_HEADERS_SENT]:将标头发送到客户端后无法设置标头
- javascript - React 无法正确处理计数器
- python - 使用 np.where 过滤行的问题
- google-text-to-speech - Invoke-WebRequest :远程服务器返回错误:(403) Forbidden
- c - 递归搜索具有特定扩展名的文件...C语言
- spring-boot - Spring Boot + Websocket + Stomp 在服务器空闲时抛出异常
- python - 配置缩放轴的距离
- r - 为数据集定义病例对照研究设计