java - 使用 Spring Cloud Stream 的 Spring REST 请求响应
问题描述
我正在尝试使用 Spring 集成网关将休息端点请求/响应与 Spring 云流连接起来。下面的代码适用于第一次休息调用,但后续调用不起作用。我了解 Spring Cloud Stream 用于消息传递/异步操作。但这是一个需要请求/响应同步的实际场景。
SpringBoot应用程序
package com.example.restgateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding({RestGatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class RestGatewayApplication {
interface GatewayChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Input(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Output(TO_UPPERCASE_REQUEST)
MessageChannel toUppercaseRequest();
}
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
String process(String payload);
}
private static final String ENRICH = "enrich";
public static void main(String[] args) {
SpringApplication.run(RestGatewayApplication.class, args);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(ENRICH).enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
}
@RestController
public class UppercaseController {
@Autowired
StreamGateway gateway;
@GetMapping(value = "/uppercase/{string}",
produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
public ResponseEntity<String> getUser(@PathVariable("string") String string) {
return new ResponseEntity<String>(gateway.process(string), HttpStatus.OK);
}
}
@StreamListener(GatewayChannels.TO_UPPERCASE_REQUEST)
@SendTo(GatewayChannels.TO_UPPERCASE_REPLY)
public Message<?> process(Message<String> request) {
return MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders()).build();
}
}
应用程序.yml
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
group: stream-to-uppercase-request
producer:
required-groups: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
group: gateway-to-uppercase-reply
producer:
required-groups: gateway-to-uppercase-reply
kafka:
binder:
brokers:
- 192.168.34.210:9092
default-binder: kafka
server:
port: 8080
解决方案
目前尚不清楚您要做什么;IntegrationFlow 正在绕过目标主题直接向输入通道发送消息;回复(交替)通过主题和replyChannel
标头发出(网关所需的将丢失,因为它不可序列化)。
它将交替工作,因为您在回复通道上有 2 个订阅者 - 网关和绑定。默认情况下,当一个频道有 2 个订阅者时,消息以循环方式分发。一条消息将发送到网关,下一条发送到绑定,等等。
推荐阅读
- reactjs - React 在提交和发送后获取更新的道具
- regex - 带有条带订阅的重力形式不起作用“值必须与正则表达式模式匹配”
- shell - 如何使用詹金斯将文件从一台从机器复制到另一台机器
- angular - ag-grid 的入门演示 - 未按预期显示网格
- c# - 具有通用初始化的单值到集合 json 转换器
- reactjs - React 构造函数只调用一次相同的组件渲染两次
- java - Linux 上的 Java:获取可用内存
- python - 在使用单个 tf 会话进行批处理构建期间读取 tfrecord
- amazon-web-services - RegionUtils.getRegionsForService("eks") 返回空列表
- oracle - 如何在 liquibase 中添加复合外键?