首页 > 解决方案 > 使用 Spring Integration SQS 在多个队列上聚合未来/响应 SQS Acks/Error

问题描述

我正在使用一个Gateway和在网关内部,我正在循环通过将消息发送到 2 个队列sqs-outbound adapter

我想实现这样的目标:

jsonArray.forEach(data -> {
    Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
    futures.add(result);
});
futures.forEach(f -> {                      
    System.out.println("Future Result -> "+ f.get())
});

网关和 SQS 适配器配置

<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
    <int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>

<!-- Send to 2 Queues -->

    <int:channel id="successChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-1"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-2"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>

我正在查看apply-sequenceand dataChannelaggregatoraggregator必须能够同时处理ackand error

问题:如何将 2 个队列的聚合响应(确认 + 错误)返回到网关?

标签: spring-integrationspring-integration-aws

解决方案


你在正确的方向与那些success-channelfailure-channel。聚合器可以订阅它successChannel并使用默认策略进行关联和发布。使用 anfailure-channel您可能需要一个自定义错误通道,而不是一个全局errorChannel. 从ErrorMessage发送到该通道的消息SqsMessageHandler具有这样的有效负载AwsRequestFailureException,其中它failedMessage实际上包含带有提到的相关详细信息的请求消息。因此,您需要添加一些转换步骤以从异常中提取该信息,然后再继续使用聚合器。


推荐阅读