spring-integration - 使用 Spring Integration SQS 在多个队列上聚合未来/响应 SQS Acks/Error
问题描述
我正在使用一个Gateway
和在网关内部,我正在循环通过将消息发送到 2 个队列sqs-outbound adapter
。
我想实现这样的目标:
jsonArray.forEach(data -> {
Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
futures.add(result);
});
futures.forEach(f -> {
System.out.println("Future Result -> "+ f.get())
});
网关和 SQS 适配器配置
<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
<int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>
<!-- Send to 2 Queues -->
<int:channel id="successChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-1"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-2"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
我正在查看apply-sequence
and dataChannel
,aggregator
但aggregator
必须能够同时处理ack
and error
。
问题:如何将 2 个队列的聚合响应(确认 + 错误)返回到网关?
解决方案
你在正确的方向与那些success-channel
和failure-channel
。聚合器可以订阅它successChannel
并使用默认策略进行关联和发布。使用 anfailure-channel
您可能需要一个自定义错误通道,而不是一个全局errorChannel
. 从ErrorMessage
发送到该通道的消息SqsMessageHandler
具有这样的有效负载AwsRequestFailureException
,其中它failedMessage
实际上包含带有提到的相关详细信息的请求消息。因此,您需要添加一些转换步骤以从异常中提取该信息,然后再继续使用聚合器。
推荐阅读
- unity3d - 使用注视控件忽略空间映射
- javascript - prototypejs - 为带有类的按钮创建点击事件
- sql-server - RDS SQL Server 压缩
- c# - 如何使用spring.net通过codeconfig实现构造函数、属性和方法注入?
- python - pandas groupby,只保留第一次出现的行
- excel - 第 1 列中的两个标准并在第 3 列中添加相应的值
- java - 如何在用户输入 JTextArea 时获取输入文本的高度?
- tcl - tcl/tk panedwindows,如何为子窗口定义百分比大小,以便手动调整大小保持显示比例?
- python - 在 Python 上的 SQL 查询中使用变量
- c++ - 如何防止用餐哲学家c ++中的僵局