首页 > 解决方案 > spring-integration:如何将延迟的详细信息作为 SSE 交付

问题描述

我有一个我想尽快检索并返回的项目列表。

对于每个项目,我还需要检索详细信息,它们可能会在几秒钟后返回。

我当然可以使用 HTTP 网关创建两个不同的路由,并首先请求列表,然后请求详细信息。但是,我必须等到所有细节都到达。我想立即发回列表,然后在收到详细信息后立即发回。

更新

按照 Artem Bilan 的建议,我的流程返回一个 Flux 作为有效负载,它将项目列表合并为 Mono,将处理的项目合并为 Flux。

请注意,下面的示例通过调用模拟项目的详细处理toUpperCase;我的真实用例需要路由和拨出电话以获取每个项目的详细信息:

    @Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlows
            .from(WebFlux.inboundGateway("/strings/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))
                    .mappedResponseHeaders("*"))
            .enrichHeaders(Collections.singletonMap("aHeader", new String[]{"foo", "bar"}))
            .transform("headers.aHeader")
            .<String[]>handle((p, h) -> {
                return Flux.merge(
                        Mono.just(p),
                        Flux.fromArray(p)
                                .map(t -> {
                                    return t.toUpperCase();
                                    // return detailsResolver.resolveDetail(t);
                                }));
            })
            .get();
}

这更接近我的目标。当我使用 curl 从该流中请求数据时,我会立即获得项目列表,稍后会获得已处理的项目:

λ curl http://localhost:8080/strings/sse
data:["foo","bar"]

data:FOO

data:BAR

虽然简单地将字符串转换为大写可以正常工作,但我很难使用WebFlux.outboundGateway. detailsResolver上面注释掉的代码中的定义如下:

@MessagingGateway
public interface DetailsResolver {

    @Gateway(requestChannel = "itemDetailsFlow.input")
    Object resolveDetail(String item);

}

@Bean
IntegrationFlow itemDetailsFlow() {
    return f -> f.handle(WebFlux.<String>outboundGateway(m ->
            UriComponentsBuilder.fromUriString("http://localhost:3003/rest/path/")
                    .path(m.getPayload())
                    .build()
                    .toUri())
            .httpMethod(HttpMethod.GET)
            .expectedResponseType(JsonNode.class)
            .replyPayloadToFlux(false));
}

当我在detailsResolver通话中发表评论并发表评论时t.toUpperCaseoutboundGateway似乎已正确设置(日志显示订阅者存在,需求已发出信号)但从未得到响应(未达到断点ExchangeFunctions.exchange#91)。

DetailsResolver我通过从上下文中获取它作为 bean 并调用它的方法来确保它本身正在工作 - 这给了我一个 JsonNode 响应。

可能是什么原因?

标签: spring-integrationserver-sent-eventsspring-integration-dsl

解决方案


是的,我不会toReactivePublsiher()在那里使用,因为您有当前请求的上下文。您需要每个请求的助焊剂。我会使用类似的东西Flux.merge(Publisher<? extends I>... sources),其中第一个Flux用于项目,第二个用于每个项目的详细信息(类似于Tuple2)。

为此,你真的可以使用这样的东西:

  IntegrationFlows
                .from(WebFlux.inboundGateway("/sse")
                        .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))

并且您的下游流应该Flux作为回复的有效负载产生。

我在测试用例中有这样的样本:

    @Bean
    public IntegrationFlow sseFlow() {
        return IntegrationFlows
                .from(WebFlux.inboundGateway("/sse")
                        .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))
                        .mappedResponseHeaders("*"))
                .enrichHeaders(Collections.singletonMap("aHeader", new String[] { "foo", "bar", "baz" }))
                .handle((p, h) -> Flux.fromArray((String[]) h.get("aHeader")))
                .get();
    }

推荐阅读