首页 > 解决方案 > 侦听器 SQS 停止消费消息

问题描述

嗨,我正在尝试从我的 java 应用程序中的队列中消费消息,但过了一会儿我的消费者停止轮询队列。在我的应用程序中,我有两个侦听器,并且消耗最多的队列通常停止被消耗。这是我的 AWS 配置:

@Configuration
public class AWSConfiguration {

    private final String region;
    private final String accessKey;
    private final String secretKey;

    public AWSConfiguration(
            @Value("${cloud.aws.region.static}") final String region,
            @Value("${cloud.aws.credentials.accessKey}") final String accessKey,
            @Value("${cloud.aws.credentials.secretKey}") final String secretKey) {
        this.region = region;
        this.accessKey = accessKey;
        this.secretKey = secretKey;
    }

    private AWSCredentialsProvider getAwsCredentials(final String accessKey, final String secretKey) {
        final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretKey);
        return new AWSStaticCredentialsProvider(basicAWSCredentials);
    }

    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync(){
        return AmazonSQSAsyncClientBuilder
                .standard()
                .withCredentials(getAwsCredentials(accessKey, secretKey))
                .withRegion(region)
                .build();
    }
}

这些是我的 SQSListeners:

@SqsListener(value = "${cloud.aws.sqs.name.image-mosaic-demand-receive-mosaics}", deletionPolicy = ON_SUCCESS)
    public void onReceiveMosaics(@Payload String message) {
        LOGGER.trace("m=receiveMosaics(message={})", message);

        GlebeDTO glebeDTO = receiveMosaicsMessageHandler.parseReceiveMessage(message);

        LOGGER.info("Mosaicos do talhão {} recebidos", glebeDTO.getExternalId());

        if(glebeDTO.getMosaics().size() >= 1){ 
            processMosaicReceivedUseCase.processWithMosaics(buildCriteria(glebeDTO));
        }else{
            processMosaicReceivedUseCase.processWithoutMosaics(buildCriteria(glebeDTO));
        }
    }

@SqsListener(value = "${cloud.aws.sqs.name.image-mosaic-demand-order-request}", deletionPolicy = ON_SUCCESS)
    public void receiveOrder(@Payload String message) {
        LOGGER.trace("m=receiveOrder(message={})", message);

        OrderRequestDTO orderRequestDTO = this.requestOrderMessageHandler.parseReceiveMessage(message);

        LOGGER.info("{} - Recebida solicitação de mosaicos", orderRequestDTO.getOrderId());

        validator.validate(orderRequestDTO);

        Demand demand = orderRequestDTOToDemandConverter.convert(orderRequestDTO);

        receiveRequestUseCase.onReceiveRequest(demand);
    }

有没有我缺少的配置?

标签: javaamazon-web-servicesamazon-sqs

解决方案


我知道这是一个老问题,我没有答案 - 但我们遇到了类似甚至可能相同的问题。

我们的@SqsListener 在运行好几天后停止提取数据,并且毫无问题地提取了数十万条消息。

我们进行了广泛的监控和日志记录——但监控表明,当这种情况发生时,我们使用的 RAM/CPU 并没有比一切正常的时候多。在此之前或之后,我们的日志不包含任何错误。

似乎 SqsListener 使用的线程只是停止/死亡并且没有恢复活力,我们必须重新启动 Java 服务才能再次进行集成。


推荐阅读