spring-integration - spring-integration:如何将延迟的详细信息作为 SSE 交付
问题描述
我有一个我想尽快检索并返回的项目列表。
对于每个项目,我还需要检索详细信息,它们可能会在几秒钟后返回。
我当然可以使用 HTTP 网关创建两个不同的路由,并首先请求列表,然后请求详细信息。但是,我必须等到所有细节都到达。我想立即发回列表,然后在收到详细信息后立即发回。
更新
按照 Artem Bilan 的建议,我的流程返回一个 Flux 作为有效负载,它将项目列表合并为 Mono,将处理的项目合并为 Flux。
请注意,下面的示例通过调用模拟项目的详细处理toUpperCase
;我的真实用例需要路由和拨出电话以获取每个项目的详细信息:
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/strings/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))
.mappedResponseHeaders("*"))
.enrichHeaders(Collections.singletonMap("aHeader", new String[]{"foo", "bar"}))
.transform("headers.aHeader")
.<String[]>handle((p, h) -> {
return Flux.merge(
Mono.just(p),
Flux.fromArray(p)
.map(t -> {
return t.toUpperCase();
// return detailsResolver.resolveDetail(t);
}));
})
.get();
}
这更接近我的目标。当我使用 curl 从该流中请求数据时,我会立即获得项目列表,稍后会获得已处理的项目:
λ curl http://localhost:8080/strings/sse
data:["foo","bar"]
data:FOO
data:BAR
虽然简单地将字符串转换为大写可以正常工作,但我很难使用WebFlux.outboundGateway
. detailsResolver
上面注释掉的代码中的定义如下:
@MessagingGateway
public interface DetailsResolver {
@Gateway(requestChannel = "itemDetailsFlow.input")
Object resolveDetail(String item);
}
@Bean
IntegrationFlow itemDetailsFlow() {
return f -> f.handle(WebFlux.<String>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:3003/rest/path/")
.path(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(JsonNode.class)
.replyPayloadToFlux(false));
}
当我在detailsResolver
通话中发表评论并发表评论时t.toUpperCase
,outboundGateway
似乎已正确设置(日志显示订阅者存在,需求已发出信号)但从未得到响应(未达到断点ExchangeFunctions.exchange#91
)。
DetailsResolver
我通过从上下文中获取它作为 bean 并调用它的方法来确保它本身正在工作 - 这给了我一个 JsonNode 响应。
可能是什么原因?
解决方案
是的,我不会toReactivePublsiher()
在那里使用,因为您有当前请求的上下文。您需要每个请求的助焊剂。我会使用类似的东西Flux.merge(Publisher<? extends I>... sources)
,其中第一个Flux
用于项目,第二个用于每个项目的详细信息(类似于Tuple2
)。
为此,你真的可以使用这样的东西:
IntegrationFlows
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
并且您的下游流应该Flux
作为回复的有效负载产生。
我在测试用例中有这样的样本:
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))
.mappedResponseHeaders("*"))
.enrichHeaders(Collections.singletonMap("aHeader", new String[] { "foo", "bar", "baz" }))
.handle((p, h) -> Flux.fromArray((String[]) h.get("aHeader")))
.get();
}
推荐阅读
- apache-kafka - docker中的本地kafka,具有事务支持
- arrays - C如何解析char数组中的int和char?
- c# - 要实施哪些 Google API 来访问不同云端硬盘位置的文件?
- python - 使用从其他脚本导入函数的 .bat 运行 python 脚本
- python - 在反汇编的一条指令中创建空集
- php - 使用 PHP,我如何在我知道关键的元素之前获取元素
- java - 无法确定任务“:app:processDebugManifest”的依赖关系
- c - 为什么我的 hello world 二进制文件大多为零?
- javascript - 如何将 Fetch 响应返回到可用变量中
- javascript - 使用 JavaScript 获得鼠标移动的差异