首页 > 解决方案 > 带有入站通道和响应通道的 TCP 客户端的动态生成

问题描述

我是 Spring 集成的新手。

使用 Spring 4,只有 java 注释。

我现在正在工作的项目我们在属性文件中设置了 tcp 连接。

目前它被硬编码为只有 2 个不同的连接,并且必须更改为更动态的方法,我们可以在属性文件中设置可变数量的连接,并能够在运行时添加新连接。

我知道动态 tcp 客户端示例的存在,并试图以此为基础。

首先,我们为连接设置以下 bean:

@Bean(name = "node1TCPConnection")
public AbstractClientConnectionFactory node1TCPConnection() {
  final TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(
  env.getProperty("socket.tcp.nodes[0].ip"), 
  env.getProperty("socket.tcp.nodes[0].port", Integer.class)
  );

  tcpNetClientConnectionFactory.setSingleUse(false);
  tcpNetClientConnectionFactory.setSoKeepAlive(true);

  final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);

  tcpNetClientConnectionFactory.setSerializer(by);
  tcpNetClientConnectionFactory.setDeserializer(by);
  return tcpNetClientConnectionFactory;
}

然后我们有等待发送东西的适配器:

@Bean
public TcpReceivingChannelAdapter node1TcpReaderClient(
        @Qualifier("node1TCPConnection") final AbstractClientConnectionFactory connectionFactory) {
    final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
    adapter.setConnectionFactory(connectionFactory);
    adapter.setClientMode(true);
    adapter.setErrorChannelName("errorChannel");
    adapter.setRetryInterval(retryInterval);
    adapter.setOutputChannel(fromTcp());
    return adapter;
}

当调用 fromTcp() 时,它会转换消息,然后以下代码将其发送到另一个应用程序以进行进一步处理。

@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
    sendToApi(inMessage, headerMap);
}

处理消息后,我们必须发送响应。

@Bean
@ServiceActivator(inputChannel = "toTcpCh01")
public TcpSendingMessageHandler tcpOutGateCh01(
        final @Qualifier("node1TCPConnection") AbstractClientConnectionFactory connectionFactory) {
    final TcpSendingMessageHandler tcpSendingMsgHandler = new TcpSendingMessageHandler();
    tcpSendingMsgHandler.setConnectionFactory(connectionFactory);
    return tcpSendingMsgHandler;
}

并使用网关:

@MessagingGateway()
public interface MessageTcpGateway {

  @Gateway(requestChannel = "toTcpCh01")
  ListenableFuture<Void> sendTcpChannel01(@Header("host") String host, byte[] inMessage);
}

我们把它寄回去。

通过该示例,我可以了解如何为响应动态创建流。

但是我无法理解如何创建一个公共连接池,然后根据这些连接工厂动态创建侦听适配器和响应适配器,然后在运行时关闭/删除它们。

由于这个问题,我有点了解如何使用入站适配器进行流程

我是否需要为每个适配器创建多个单独的 IntegrationFlow?所以所有的调用和响应都可以异步处理(我可能对异步有误)

然后在想要关闭连接时单独处理它们?像调用接近 TcpReceivingChannelAdapter 然后调用 TcpSendingMessageHandler 最后注销connectonfactory?

标签: javaspring-bootspring-integrationspring-integration-dsl

解决方案


我不认为对于协作通道适配器,您需要单独IntegrationFlow定义TcpReceivingChannelAdapterand TcpSendingMessageHandler。它真的可以作为一个IntegrationFlow从 开始TcpReceivingChannelAdapter并以 . 结束TcpSendingMessageHandler。关键是它IntegrationFlow本身只是一个对组件引用进行分组的逻辑容器。艰苦的工作实际上是由您在此处声明的所有组件完成的,并且通过此TcpReceivingChannelAdapterTcpSendingMessageHandler和之间的网关,您将真正实现异步。

请记住,它ByteArrayLengthHeaderSerializer也必须被声明为 bean。不确定每个动态流是否需要一个单独的实例,但无论如何,这里有一个 API 可以做到这一点:

    /**
     * Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
     * application context. Usually it is some support component, which needs an application context.
     * For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
     * @param bean an additional arbitrary bean to register into the application context.
     * @return the current builder instance
     */
    IntegrationFlowRegistrationBuilder addBean(Object bean);

推荐阅读