spring-webflux - 如何使用订阅方法在异步/反应式 Web 客户端请求中返回 Flux
问题描述
我正在使用弹簧六边形架构(端口和适配器),因为我的应用程序需要从源主题读取数据流,处理/转换数据,并将其发送到目标主题。
我的应用程序需要执行以下操作。
- 读取数据(将有回调 url)
- 使用传入数据中的 url 进行 http 调用(使用 webclient)
- 获取实际数据,需要将其转换为另一种格式。
- 将转换后的数据发送到传出主题。
这是我的代码,
public Flux<TargeData> getData(Flux<Message<EventInput>> message)
{
return message
.flatMap(it -> {
Event event = objectMapper.convertValue(it.getPayload(), Event.class);
String eventType = event.getHeader().getEventType();
String callBackURL = "";
if (DISTRIBUTOR.equals(eventType)) {
callBackURL = event.getHeader().getCallbackEnpoint();
WebClient client = WebClient.create();
Flux<NodeInput> nodeInputFlux = client.get()
.uri(callBackURL)
.headers(httpHeaders -> {
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
List<MediaType> acceptTypes = new ArrayList<>();
acceptTypes.add(MediaType.APPLICATION_JSON);
httpHeaders.setAccept(acceptTypes);
})
.exchangeToFlux(response -> {
if (response.statusCode()
.equals(HttpStatus.OK)) {
System.out.println("Response is OK");
return response.bodyToFlux(NodeInput.class);
}
return Flux.empty();
});
nodeInputFlux.subscribe( nodeInput -> {
SourceData source = objectMapper.convertValue(nodeInput, SourceData.class);
// return Flux.fromIterable(this.TransformImpl.transform(source));
});
}
return Flux.empty();
});
}
上面代码中的注释行给出了编译,因为订阅方法不允许返回类型。
我在这里需要一个“不使用块”的解决方案。
请在这里帮助我,在此先感谢。
解决方案
我想我明白其中的逻辑。你可能想要的是这样的:
public Flux<TargeData> getData(Flux<Message<EventInput>> message) {
return message
.flatMap(it -> {
// 1. marshall and unmarshall operations are CPU expensive and could harm event loop
return Mono.fromCallable(() -> objectMapper.convertValue(it.getPayload(), Event.class))
.subscribeOn(Schedulers.parallel());
})
.filter(event -> {
// 2. Moving the if-statement yours to a filter - same behavior
String eventType = event.getHeader().getEventType();
return DISTRIBUTOR.equals(eventType);
})
// Here is the trick 1 - your request below return Flux of SourceData the we will flatten
// into a single Flux<SourceData> instead of Flux<List<SourceData>> with flatMapMany
.flatMap(event -> {
// This WebClient should not be created here. Should be a singleton injected on your class
WebClient client = WebClient.create();
return client.get()
.uri(event.getHeader().getCallbackEnpoint())
.accept(MediaType.APPLICATION_JSON)
.exchangeToFlux(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
System.out.println("Response is OK");
return response.bodyToFlux(SourceData.class);
}
return Flux.empty();
});
})
// Here is the trick 2 - supposing that transform return a Iterable of TargetData, then you should do this and will have Flux<TargetData>
// and flatten instead of Flux<List<TargetData>>
.flatMapIterable(source -> this.TransformImpl.transform(source));
}
推荐阅读
- angular - 迁移到微前端结构后,没有动态创建组件的提供者
- python - 从python中的文本文件中删除所有空行
- python - 多个条件的python函数
- wordpress - 我想为我的网站创建一个安全的支付网关
- flutter - 在颤动中生成带有图像的PDF文件
- amazon-web-services - Amazon Personalize 中的 InternalServerError:我们遇到了内部错误。请再试一次
- javascript - 如何让 chrome.webRequest.onBeforeRequest 等到收到来自 chrome.runtime.sendNativeMessage 的响应?
- python - 根据逗号拆分元素是否是Python中另一列的子字符串来合并两个数据框
- docker - 构建时无法在 docker 中获取 asdf-direnv 工作
- css - 即使我设置了媒体查询,当浏览器最小化时网站不会滚动?- CSS