java - 如何使用 Java 注释设置 Spring Integration Collaborating Channel 适配器?
问题描述
我看过与我的问题相关的其他帖子,但没有一个答案能帮助我解决我的问题。
我有一个接受请求的 Spring Rest API,这些请求被更改为 XML,然后我将它们发送到另一个接受 TCP 请求的应用程序。
使用 TcpOutboundGateway 和 TcpInboundGateway 工作得很好,但是速度很慢,所以我想通过协作通道适配器和多路复用来加速它。
这个想法(据我所知)是通过网关发送请求,该请求桥接到聚合器,该请求也由 TcpSendingMessageHandler 发送到另一个应用程序的 tcp 服务器。然后 TcpReceivingChannelAdapter 监听响应,这些响应被发送到聚合器,在那里它们与它们的请求相关联(CORRELATION_ID 标头的 bc),然后发送到将字节转换为字符串的转换通道。
显然,我的理解是错误的,因为我看不到响应如何返回到网关并且它不起作用。
我可以看到 Socket 正在打开,但在发送消息后它立即关闭,因此反序列化器返回 EOF: null 错误。
我是否设置了错误的 TcpReceivingChannelAdapter?
响应如何返回到网关?
我应该使用 Future 作为网关响应吗?
TCP配置:
@EnableIntegration
@IntegrationComponentScan
@Configuration
public class TcpMultiPlexConfig implements ApplicationListener<TcpConnectionEvent> {
protected final static Logger LOGGER = LoggerFactory.getLogger(TcpMultiPlexConfig.class);
@Value("${engine.port}")
private int port;// = 55001;
@Value("${engine.address}")
private String ipAddress;// = "192.168.1.1";
@Value("${engine.timeout}")
private int timeout;
@Override
public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
TcpConnection source = (TcpConnection) tcpEvent.getSource();
if (tcpEvent instanceof TcpConnectionOpenEvent) {
LOGGER.info("********* Socket Opened " + source.getConnectionId());
} else if (tcpEvent instanceof TcpConnectionCloseEvent) {
LOGGER.info("*********** Socket Closed " + source.getConnectionId());
}
}
@MessagingGateway(defaultRequestChannel="input")
public interface MultiPlexGateway {
String send(@Payload String in, @Header("CORRELATION_ID") String transactionId);
}
// TODO the request and response are being put together
@Bean
@ServiceActivator(inputChannel = "input")
public BridgeHandler bridge() {
BridgeHandler bridge = new BridgeHandler();
bridge.setOutputChannelName("toAggregatorClient");
bridge.setOrder(1);
return bridge;
}
@Bean
public PublishSubscribeChannel input() {
return new PublishSubscribeChannel();
}
@Bean
public DirectChannel toAggregatorClient() {
return new DirectChannel();
}
@Bean
public DirectChannel noResponseChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel toTransformerClient() {
return new DirectChannel();
}
@Bean
public TcpReceivingChannelAdapter inAdapterClient() {
TcpReceivingChannelAdapter receivingAdapter = new TcpReceivingChannelAdapter();
receivingAdapter.setConnectionFactory(clientConnectionFactory());
receivingAdapter.setOutputChannel(toAggregatorClient());
receivingAdapter.setClientMode(true);
return receivingAdapter;
}
@Bean
@ServiceActivator(inputChannel = "input")
public TcpSendingMessageHandler outAdapterClient() {
TcpSendingMessageHandler outAdapter = new TcpSendingMessageHandler();
outAdapter.setOrder(2);
outAdapter.setConnectionFactory(clientConnectionFactory());
outAdapter.setClientMode(true);
return outAdapter;
}
@Bean(name ="clientCFMP")
public AbstractClientConnectionFactory clientConnectionFactory() {
TcpNetClientConnectionFactory tcp = new TcpNetClientConnectionFactory(this.ipAddress , this.port);
tcp.setSerializer(new DefaultSerializer()); // out
// byte delimeter = "\n".getBytes()[0];
// ElasticByteArrayRawSingleTerminatorSerializer deserializer = new ElasticByteArrayRawSingleTerminatorSerializer(delimeter);
// DefaultDeserializer deserializer = new DefaultDeserializer();
MyDefaultDeserializer deserializer = new MyDefaultDeserializer();
tcp.setDeserializer(deserializer);
tcp.setSoTimeout(timeout);
tcp.setSingleUse(false);
MapMessageConverter mc = new MapMessageConverter();
mc.setHeaderNames("CORRELATION_ID");
tcp.setMapper(new MessageConvertingTcpMessageMapper(mc));
return tcp;
}
@MessageEndpoint
public static class MyConverters {
@Transformer(inputChannel="toTransformerClient", outputChannel = "resultToString")
public byte[] getResponse(MessageGroup payload) {
// byte[] result = null;
List<Message<?>>list = new ArrayList<>(payload.getMessages());
byte[] result = (byte[]) list.get(1).getPayload();
// LOGGER.info(result);
return result;
}
@Transformer(inputChannel="resultToString")
public String convertResult(byte[] bytes) {
String result = new String(bytes);
LOGGER.info("*********** RESULT => " + result);
return result;
}
@ServiceActivator(inputChannel = "noResponseChannel")
public MessageTimeoutException noResponse(String input) {
throw new MessageTimeoutException("****** No response received for => " + input);
}
}
@Bean
@ServiceActivator(inputChannel = "toAggregatorClient", outputChannel = "toTransformerClient")
public FactoryBean<MessageHandler> aggregatorFactoryBean() {
AggregatorFactoryBean afb = new AggregatorFactoryBean ();
afb.setExpireGroupsUponCompletion(true);
afb.setExpireGroupsUponTimeout(true);
afb.setGroupTimeoutExpression(new ValueExpression<>(this.timeout));
afb.setCorrelationStrategy(new HeaderAttributeCorrelationStrategy("CORRELATION_ID"));
afb.setReleaseStrategy(new MessageCountReleaseStrategy(2));
afb.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
afb.setSendPartialResultOnExpiry(false);
afb.setMessageStore(new SimpleMessageStore());
afb.setDiscardChannel(noResponseChannel());
return afb;
}
调用网关的服务:
@Service
public class MultiPlexGatewayTransmission <T extends EngineData> extends AbstractMultiPlexEngineTransmission {
public MultiPlexGatewayTransmission(MultiPlexGateway gateway) {
super(gateway);
}
@Override
public T request(EngineData request, Class<? extends EngineData> clazz) {
String response = gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
if(response == null || response.isEmpty()) {
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("MPGateway response ::: " + response.trim());
}
@SuppressWarnings("unchecked")
T clientResponse = (T) JaxbUtils.unmarshall(response, clazz);
if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("*** Unmarshall response ::: " + clientResponse);
}
return clientResponse;
}
测试用例:
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
public class ITGetClientsTest extends AbstractEngineTest {
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
// @Autowired
// private GatewayTransmission<ClientsResponse> transmission;
@Autowired
private MultiPlexGatewayTransmission<ClientsResponse> transmission;
@Test
public void testGetClients() {
LOGGER.info("Gateway test testGetClients... ");
Api api = new Api();
api.setIp("192.168.1.1");
api.setMessageId(UUID.randomUUID().toString());
api.setVersion("1.0");
api.setUserToken(token);
ClientsRequest request = new ClientsRequest();
request.setApi(api);
ClientsResponse response = (ClientsResponse) transmission.request(request, ClientsResponse.class);
Assert.assertTrue(response != null);
Assert.assertTrue(!response.getClient().isEmpty());
LOGGER.info(Arrays.deepToString(response.getClient().toArray()));
}
}
解决方案
我没有详细查看您的代码;已经很晚了,又是一个周末,但请参阅此答案以获得更简单的技术,以使用入站/出站连接 ID 关联请求/回复。
推荐阅读
- node.js - 如何在 MongoDB 中设置缓冲区偏移范围,它不允许在 BSON 对象中上传超过 16MB 的文件?
- android - Android - 希伯来文文本显示不正确
- reactjs - 如何在使用 reactjs 构建的 jest Shallow 测试中修复组件变量的类型推断?
- java - DDB 中复合范围键的 Java 注释
- getstream-io - 使用 Node js 客户端 (GetStream.io) 的空更新频道请求
- gtk - 使用 vala 编程语言,你能检测到其他应用程序何时全屏或最大化,并暂停自己的程序吗?
- css - 为什么这个 CSS 选择器没有返回任何结果?
- mysql - 为什么查询集合名称,在新 pod 启动后变慢
- swift - Streamchat.io:访问用户的团队
- java - 为什么 Apache Tika 将 jar 文件的 mimetype 检测为 application/zip 而不是 application/java-archive?