java - 具有并发性的 AWS SQS 队列配置的 Spring JMS 侦听器
问题描述
我试图弄清楚如何配置 jms 侦听器以侦听 AWS 队列并在许多线程中处理消息(同时约 100 个)。
下面是我的配置。
@Configuration
@EnableJms
public class JmsConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(AmazonSQS amazonSQS) {
ProviderConfiguration providerConfiguration = new ProviderConfiguration().withNumberOfMessagesToPrefetch(0);
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(providerConfiguration, amazonSQS);
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("30-100");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION);
factory.setErrorHandler(t -> {
});
return factory;
}
}
通过这种配置,我不断收到以下错误:
SQSMessageConsumer - 30秒后无法终止执行器服务ConsumerPrefetch,一些正在运行的线程将立即关闭
此外,将消息发布到AmazonSQS实例需要 20 秒。
我尝试了NumberOfMessagesToPrefetch和CacheLevel的不同组合,但没有一个能正常工作。
例如CacheLevel = CACHE_CONSUMER没有错误但一次处理 1 条消息。
请帮我配置一下。谢谢!
图书馆:
aws-java-sdk:1.11.41
弹簧-jms:5.1.7
amazon-sqs-java-messaging-lib:1.0.6
解决方案
@Boris我能够通过 以下方式解决此问题。我没有设置缓存级别。提供程序配置仍在禁用预取
ProviderConfiguration providerConfiguration = new ProviderConfiguration().withNumberOfMessagesToPrefetch(0);
此外,除了未显示的其他值之外,我还使用DefaultJmsListenerContainerFactory
并设置了以下值:
DefaultJmsListenerContainerFactory
factory.setConcurrency("5-10"); //This will depend on your use case
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); //Used to in-flight erroneous messages
factory.setTaskExecutor(threadPoolTaskExecutor()); //Uses threadpooltask executor, see below
factory.setMaxMessagesPerTask(10); //Not required, but to reduce thread-switching. This will depend on your use case
请在使用 ThreadPool Executor 之前阅读 ThreadPool Executor,它写于大约 12 年前,但仍然有效。我没有在@JmsListener 级别上设置并发,因为它会导致额外的复杂性
推荐阅读
- javascript - connectedCallback 与 window.load
- java - Java 对象中方法的数量会影响它的重量吗?
- css - 防止 Bootstrap 按钮填充 flexbox
- c - 仅包含头文件中的特定定义
- reactjs - 如何将 Axios 标头设置为实例(非全局)
- python-3.x - 从保存的 DNN 估计器进行预测,Predict_Dict 无效
- api-platform.com - 方法不起作用的安全性
- javascript - 如何更改此字符串中字母的形式?
- android - Android studio——如何使用gradle在Java包中包含资源文件
- python - 拆分数据框列中的值列表