java - Spring Integration Java DSL HTTP在超时错误内没有收到回复
问题描述
我正在使用 Spring 集成 5.0.6。我已经完成了它的文档并创建了以下代码,该代码在 HTTP 端点上侦听并发布到 kafka 主题。
一切正常,我也在主题上收到消息。但是在 HTTP 客户端没有发送回复,它给出“在超时内没有收到回复”。
如何在以下代码中向 http 调用者发送回复:
@Bean
public DirectChannel replyChannel() {
return new DirectChannel();
}
@Bean(name = "restInputFlow")
public IntegrationFlow send() {
return IntegrationFlows
.from(Http.inboundGateway("/push").requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class).replyChannel(replyChannel()))
.transform(new Transformer())
.handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))
.enrichHeaders(
c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED))
.get();
}
private KafkaProducerMessageHandlerSpec<GenericRecord, GenericRecord, ?> kafkaMessageHandler(
ProducerFactory<GenericRecord, GenericRecord> producerFactory, String topic) {
return Kafka.outboundChannelAdapter(producerFactory)
.messageKey("key").headerMapper(mapper())
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
谢谢你的帮助。
解决方案
您使用 one-way 的问题Kafka.outboundChannelAdapter(producerFactory)
。这仅适用于“发送和忘记”。
如果您有兴趣生成一些后续流程,或者只是需要回复 HTTP 请求,您应该考虑使用:
/**
* The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel}
* method specific implementation to allow the use of the 'subflow' subscriber capability.
* @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
* {@link PublishSubscribeSpec} options including 'subflow' definition.
* @return the current {@link IntegrationFlowDefinition}.
*/
public B publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) {
在流程定义中,您的第一个订阅者实际上是那个 Kafka.outboundChannelAdapter(producerFactory)
,而第二个订阅者可以是上面提到的那个.enrichHeaders()
。如果您什么都不做,最后一个会将其结果发送到replyChannel
标头中,因此将到达 HTTP 响应。
在这个发布-订阅场景中,您应该记住,payload
第二个订阅者的订阅将与您尝试发送到 Kafka 的相同。