首页 > 解决方案 > 如何使用 Java 注释设置 Spring Integration Collaborating Channel 适配器?

问题描述

我看过与我的问题相关的其他帖子,但没有一个答案能帮助我解决我的问题。

我试图按照这里的例子:https ://github.com/garyrussell/spring-integration-samples/tree/master/intermediate/tcp-client-server-multiplex

我有一个接受请求的 Spring Rest API,这些请求被更改为 XML,然后我将它们发送到另一个接受 TCP 请求的应用程序。

使用 TcpOutboundGateway 和 TcpInboundGateway 工作得很好,但是速度很慢,所以我想通过协作通道适配器和多路复用来加速它。

这个想法(据我所知)是通过网关发送请求,该请求桥接到聚合器,该请求也由 TcpSendingMessageHandler 发送到另一个应用程序的 tcp 服务器。然后 TcpReceivingChannelAdapter 监听响应,这些响应被发送到聚合器,在那里它们与它们的请求相关联(CORRELATION_ID 标头的 bc),然后发送到将字节转换为字符串的转换通道。

显然,我的理解是错误的,因为我看不到响应如何返回到网关并且它不起作用。

我可以看到 Socket 正在打开,但在发送消息后它立即关闭,因此反序列化器返回 EOF: null 错误。

  1. 我是否设置了错误的 TcpReceivingChannelAdapter?

  2. 响应如何返回到网关?

  3. 我应该使用 Future 作为网关响应吗?

TCP配置:

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class TcpMultiPlexConfig implements ApplicationListener<TcpConnectionEvent> {

    protected final static Logger LOGGER = LoggerFactory.getLogger(TcpMultiPlexConfig.class);

    @Value("${engine.port}")
    private int port;// = 55001;
    @Value("${engine.address}")
    private String ipAddress;// = "192.168.1.1";
    @Value("${engine.timeout}")
    private int timeout;

    @Override
    public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
        TcpConnection source = (TcpConnection) tcpEvent.getSource();
        if (tcpEvent instanceof TcpConnectionOpenEvent) {
            LOGGER.info("********* Socket Opened " + source.getConnectionId());
        } else if (tcpEvent instanceof TcpConnectionCloseEvent) {
            LOGGER.info("*********** Socket Closed " + source.getConnectionId());
        }
    }

    @MessagingGateway(defaultRequestChannel="input")
    public interface MultiPlexGateway {

        String send(@Payload String in, @Header("CORRELATION_ID") String transactionId);

    }
    // TODO the request and response are being put together
    @Bean
    @ServiceActivator(inputChannel = "input")
    public BridgeHandler bridge() {
        BridgeHandler bridge = new BridgeHandler();
        bridge.setOutputChannelName("toAggregatorClient");
        bridge.setOrder(1);
        return bridge;
    }

    @Bean
    public PublishSubscribeChannel input() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public DirectChannel toAggregatorClient() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel noResponseChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel toTransformerClient() {
        return new DirectChannel();
    }

    @Bean
    public TcpReceivingChannelAdapter inAdapterClient() {
        TcpReceivingChannelAdapter receivingAdapter = new TcpReceivingChannelAdapter();
        receivingAdapter.setConnectionFactory(clientConnectionFactory());
        receivingAdapter.setOutputChannel(toAggregatorClient());
        receivingAdapter.setClientMode(true);
        return receivingAdapter;
    }


    @Bean
    @ServiceActivator(inputChannel = "input")
    public TcpSendingMessageHandler outAdapterClient() {
        TcpSendingMessageHandler outAdapter = new TcpSendingMessageHandler();
        outAdapter.setOrder(2);
        outAdapter.setConnectionFactory(clientConnectionFactory());
        outAdapter.setClientMode(true);
        return outAdapter;
    }

    @Bean(name ="clientCFMP")
    public AbstractClientConnectionFactory clientConnectionFactory() {
        TcpNetClientConnectionFactory tcp = new TcpNetClientConnectionFactory(this.ipAddress , this.port);
        tcp.setSerializer(new DefaultSerializer()); // out
//      byte delimeter = "\n".getBytes()[0];
//      ElasticByteArrayRawSingleTerminatorSerializer deserializer = new ElasticByteArrayRawSingleTerminatorSerializer(delimeter);
//      DefaultDeserializer deserializer = new DefaultDeserializer();
        MyDefaultDeserializer deserializer = new MyDefaultDeserializer();
        tcp.setDeserializer(deserializer);

        tcp.setSoTimeout(timeout);
        tcp.setSingleUse(false);
        MapMessageConverter mc = new MapMessageConverter();
        mc.setHeaderNames("CORRELATION_ID");
        tcp.setMapper(new MessageConvertingTcpMessageMapper(mc));

        return tcp;
    }


    @MessageEndpoint
    public static class MyConverters {

        @Transformer(inputChannel="toTransformerClient", outputChannel = "resultToString")
        public byte[] getResponse(MessageGroup payload) {
//          byte[] result = null;
            List<Message<?>>list = new ArrayList<>(payload.getMessages());
            byte[] result = (byte[]) list.get(1).getPayload();
//          LOGGER.info(result);
            return result;
        }

        @Transformer(inputChannel="resultToString")
        public String convertResult(byte[] bytes) {
            String result = new String(bytes);
            LOGGER.info("*********** RESULT => " + result);
            return result;
        }

        @ServiceActivator(inputChannel = "noResponseChannel")
        public MessageTimeoutException  noResponse(String input) {
            throw new MessageTimeoutException("****** No response received for => " + input);
        }

    }



    @Bean
    @ServiceActivator(inputChannel = "toAggregatorClient", outputChannel = "toTransformerClient")
    public FactoryBean<MessageHandler>  aggregatorFactoryBean() {
        AggregatorFactoryBean  afb = new AggregatorFactoryBean ();
        afb.setExpireGroupsUponCompletion(true);
        afb.setExpireGroupsUponTimeout(true);
        afb.setGroupTimeoutExpression(new ValueExpression<>(this.timeout));
        afb.setCorrelationStrategy(new HeaderAttributeCorrelationStrategy("CORRELATION_ID"));
        afb.setReleaseStrategy(new MessageCountReleaseStrategy(2));
        afb.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
        afb.setSendPartialResultOnExpiry(false);
        afb.setMessageStore(new SimpleMessageStore());
        afb.setDiscardChannel(noResponseChannel());
        return afb;
    }

调用网关的服务:

@Service
public class MultiPlexGatewayTransmission <T extends EngineData> extends AbstractMultiPlexEngineTransmission {

    public MultiPlexGatewayTransmission(MultiPlexGateway gateway) {
        super(gateway);
    }

    @Override
    public T request(EngineData request, Class<? extends EngineData> clazz) {
        String response = gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
        gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
        if(response == null || response.isEmpty()) {
            return null;
        }

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MPGateway response ::: " + response.trim());
        }

        @SuppressWarnings("unchecked")
        T clientResponse = (T) JaxbUtils.unmarshall(response, clazz);
        if (LOGGER.isDebugEnabled()) {
//          LOGGER.debug("*** Unmarshall response ::: " + clientResponse);
        }
        return clientResponse;
    }

测试用例:

@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
public class ITGetClientsTest extends AbstractEngineTest {

    private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());

//  @Autowired
//  private GatewayTransmission<ClientsResponse> transmission;

    @Autowired
    private MultiPlexGatewayTransmission<ClientsResponse> transmission;

    @Test
    public void testGetClients() {
        LOGGER.info("Gateway test testGetClients... ");

        Api api = new Api();
        api.setIp("192.168.1.1");
        api.setMessageId(UUID.randomUUID().toString());
        api.setVersion("1.0");      
        api.setUserToken(token);

        ClientsRequest request = new ClientsRequest();
        request.setApi(api);

        ClientsResponse response = (ClientsResponse) transmission.request(request, ClientsResponse.class);
        Assert.assertTrue(response != null);
        Assert.assertTrue(!response.getClient().isEmpty());

        LOGGER.info(Arrays.deepToString(response.getClient().toArray()));
    }



}

标签: javaspring-boottcpspring-integrationmultiplexing

解决方案


我没有详细查看您的代码;已经很晚了,又是一个周末,但请参阅此答案以获得更简单的技术,以使用入站/出站连接 ID 关联请求/回复。


推荐阅读