首页 > 解决方案 > 具有并发性的 AWS SQS 队列配置的 Spring JMS 侦听器

问题描述

我试图弄清楚如何配置 jms 侦听器以侦听 AWS 队列并在许多线程中处理消息(同时约 100 个)。

下面是我的配置。

@Configuration
@EnableJms
public class JmsConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(AmazonSQS amazonSQS) {
        ProviderConfiguration providerConfiguration = new ProviderConfiguration().withNumberOfMessagesToPrefetch(0);
        SQSConnectionFactory connectionFactory = new SQSConnectionFactory(providerConfiguration, amazonSQS);
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setConcurrency("30-100");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION);
        factory.setErrorHandler(t -> {
        });
        return factory;
    }

}

通过这种配置,我不断收到以下错误:

SQSMessageConsumer - 30秒后无法终止执行器服务ConsumerPrefetch,一些正在运行的线程将立即关闭

此外,将消息发布到AmazonSQS实例需要 20 秒。

我尝试了NumberOfMessagesToPrefetchCacheLevel的不同组合,但没有一个能正常工作。

例如CacheLevel = CACHE_CONSUMER没有错误但一次处理 1 条消息。

请帮我配置一下。谢谢!

图书馆:

标签: javaspringjmsamazon-sqs

解决方案


@Boris我能够通过 以下方式解决此问题。我没有设置缓存级别。提供程序配置仍在禁用预取

ProviderConfiguration providerConfiguration = new ProviderConfiguration().withNumberOfMessagesToPrefetch(0);

此外,除了未显示的其他值之外,我还使用DefaultJmsListenerContainerFactory并设置了以下值:


DefaultJmsListenerContainerFactory

factory.setConcurrency("5-10"); //This will depend on your use case factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); //Used to in-flight erroneous messages factory.setTaskExecutor(threadPoolTaskExecutor()); //Uses threadpooltask executor, see below factory.setMaxMessagesPerTask(10); //Not required, but to reduce thread-switching. This will depend on your use case


请在使用 ThreadPool Executor 之前阅读 ThreadPool Executor,它写于大约 12 年前,但仍然有效。我没有在@JmsListener 级别上设置并发,因为它会导致额外的复杂


推荐阅读