首页 > 解决方案 > Spring Integration Java DSL HTTP在超时错误内没有收到回复

问题描述

我正在使用 Spring 集成 5.0.6。我已经完成了它的文档并创建了以下代码,该代码在 HTTP 端点上侦听并发布到 kafka 主题。

一切正常,我也在主题上收到消息。但是在 HTTP 客户端没有发送回复,它给出“在超时内没有收到回复”。

如何在以下代码中向 http 调用者发送回复:

@Bean
public DirectChannel replyChannel() {
    return new DirectChannel();
}

@Bean(name = "restInputFlow")
public IntegrationFlow send() {
    return IntegrationFlows
            .from(Http.inboundGateway("/push").requestMapping(m -> m.methods(HttpMethod.POST))
                    .requestPayloadType(String.class).replyChannel(replyChannel()))
            .transform(new Transformer())
            .handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))
            .enrichHeaders(
                    c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED))
            .get();
}

private KafkaProducerMessageHandlerSpec<GenericRecord, GenericRecord, ?> kafkaMessageHandler(
            ProducerFactory<GenericRecord, GenericRecord> producerFactory, String topic) {

        return Kafka.outboundChannelAdapter(producerFactory)
                .messageKey("key").headerMapper(mapper())
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
    }

谢谢你的帮助。

标签: javaspring-integrationspring-integration-dslspring-integration-http

解决方案


您使用 one-way 的问题Kafka.outboundChannelAdapter(producerFactory)。这仅适用于“发送和忘记”。

如果您有兴趣生成一些后续流程,或者只是需要回复 HTTP 请求,您应该考虑使用:

/**
 * The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel}
 * method specific implementation to allow the use of the 'subflow' subscriber capability.
 * @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
 * {@link PublishSubscribeSpec} options including 'subflow' definition.
 * @return the current {@link IntegrationFlowDefinition}.
 */
public B publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) {

在流程定义中,您的第一个订阅者实际上是那个 Kafka.outboundChannelAdapter(producerFactory),而第二个订阅者可以是上面提到的那个.enrichHeaders()。如果您什么都不做,最后一个会将其结果发送到replyChannel标头中,因此将到达 HTTP 响应。

在这个发布-订阅场景中,您应该记住,payload第二个订阅者的订阅将​​与您尝试发送到 Kafka 的相同。


推荐阅读