java - 如何使用 Spring Integration 处理来自 AWS SQS FiFo 队列的 10 条以上并发消息
问题描述
我希望能够使用 Spring 集成工作流一次处理 10 条以上的 SQS 消息。
根据这个问题,建议使用ExecutorChannel。我更新了我的代码,但仍然有相同的症状。
如何在多个线程中执行 Spring 集成流以并行使用更多 Amazon SQS 队列消息?
进行此更新后,我的应用程序请求 10 条消息,处理这些消息,并且只有在我在流程结束附近调用amazonSQSClient.deleteMessage之后,它才会接受来自 SQS 队列的另外 10 条消息。
应用程序使用 SQS FiFo 队列。
是否还有其他我遗漏的东西,或者这是使用SqsMessageDeletionPolicy.NEVER然后在流程结束时删除消息的不可避免的症状?由于其他限制,在流程开始时接受消息并不是一个真正的选择。
以下是相关的代码片段,并进行了一些简化,但我希望它能表达问题。
队列配置
@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(50);
return executor;
}
@Bean
@Qualifier("inputChannel")
public ExecutorChannel inputChannel() {
return new ExecutorChannel(inputChannelTaskExecutor());
}
我还尝试了 ThreadPoolTaskExecutor 而不是 SimpleAsyncTaskExecutor,结果相同,但我也会将其包括在内,以防它提供其他见解。
@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(50);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("spring-async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.afterPropertiesSet();
executor.initialize();
return executor;
}
SQS 通道适配器
@Bean
public SqsMessageDrivenChannelAdapter changeQueueMessageAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSQSClient, changeQueue);
adapter.setOutputChannel(inputChannel);
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
return adapter;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500, TimeUnit.MILLISECONDS).maxMessagesPerPoll(10);
}
简化的主要流程
对我们来说,一个常见的场景是在短时间内获得大量的分支编辑。这个流程只“关心”至少发生了一次编辑。messageTransformer从有效负载文档中提取一个 id 并将其放入标头dsp_docId中,然后我们使用该标头进行聚合(我们在其他几个地方使用此 id,因此我们认为标头是有意义的,而不是在自定义中完成所有工作聚合器)。
ProvisioningServiceActivator检索分支的最新版本,然后路由器决定它是否需要进一步的转换(在这种情况下,它将它发送到transformBranchChannel )或者它可以发送到我们的 PI 实例(通过 sendToPiChannel)。
转换流程(未显示,我认为您不需要它)最终会导致发送到 PI 流程,它只是先完成更多工作。
ListingGroupProcessor捕获所有aws_receiptHandle标头并将它们作为 | 添加到新标头中。分隔列表。
sendToPi 流(和 errorFlow)以调用自定义处理程序结束,该处理程序负责删除由 aws_receiptHandle 字符串列表引用的所有 SQS 消息。
@Bean
IntegrationFlow sqsListener() {
return IntegrationFlows.from(inputChannel)
.transform(messageTransformer)
.aggregate(a -> a.correlationExpression("1")
.outputProcessor(listingGroupProcessor)
.autoStartup(true)
.correlationStrategy(message -> message.getHeaders().get("dsp_docId"))
.groupTimeout(messageAggregateTimeout) // currently 25s
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.get())
.handle(provisioningServiceActivator, "handleStandard")
.route(Branch.class, branch -> (branch.isSuppressed() == null || !branch.isSuppressed()),
routerSpec -> routerSpec.channelMapping(true, "transformBranchChannel")
.resolutionRequired(false)
.defaultOutputToParentFlow())
.channel(sendtoPiChannel)
.get();
}
解决方案
我想我会将其发布为答案,因为这解决了我的问题,并且可能对其他人有所帮助。作为答案,它更有可能被发现,而不是对可能被忽略的原始问题进行编辑。
首先,我应该注意到我们正在使用FiFo队列。
问题实际上是更进一步的链,我们将MessageGroupId设置为描述数据源的简单值。这意味着我们有非常大的消息组。
从ReceiveMessage文档中您可以看到,在这种情况下,它非常明智地阻止您从该组请求更多消息,因为如果需要将消息放回队列中,则无法保证顺序。
更新发布消息的代码以设置适当的MessageGroupId然后意味着ExecutorChannel按预期工作。
虽然具有特定 MessageGroupId 的消息是不可见的,但在可见性超时到期之前,不会返回更多属于同一 MessageGroupId 的消息。只要另一个 MessageGroupId 也是可见的,您仍然可以接收带有另一个 MessageGroupId 的消息。
推荐阅读
- java - 检查数据库结构是否正确
- angular - 有没有办法同时运行 onNext() 和 onError() 函数而不在两个回调中显式调用它?
- java - elki DBSCAN 的 distanceFunction 参数是什么?
- c# - C# 中的多重继承替换 - 属性中带有逻辑的特定假设示例
- java - 将字符串转换为 int 以将媒体时间转换为分钟和秒时出错
- vaadin - Vaadin 应用程序中的内存增长
- python - 如何使 TXT 文本中的字符串、变量在主代码中成为全局
- python-3.x - 为什么这个字符串不能转换成整数?Python
- laravel - “xxx.scss”不在 Laravel Mix 的 SourceMap 错误中
- for-loop - 在循环中选择行