java - 侦听器 SQS 停止消费消息
问题描述
嗨,我正在尝试从我的 java 应用程序中的队列中消费消息,但过了一会儿我的消费者停止轮询队列。在我的应用程序中,我有两个侦听器,并且消耗最多的队列通常停止被消耗。这是我的 AWS 配置:
@Configuration
public class AWSConfiguration {
private final String region;
private final String accessKey;
private final String secretKey;
public AWSConfiguration(
@Value("${cloud.aws.region.static}") final String region,
@Value("${cloud.aws.credentials.accessKey}") final String accessKey,
@Value("${cloud.aws.credentials.secretKey}") final String secretKey) {
this.region = region;
this.accessKey = accessKey;
this.secretKey = secretKey;
}
private AWSCredentialsProvider getAwsCredentials(final String accessKey, final String secretKey) {
final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretKey);
return new AWSStaticCredentialsProvider(basicAWSCredentials);
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync(){
return AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(getAwsCredentials(accessKey, secretKey))
.withRegion(region)
.build();
}
}
这些是我的 SQSListeners:
@SqsListener(value = "${cloud.aws.sqs.name.image-mosaic-demand-receive-mosaics}", deletionPolicy = ON_SUCCESS)
public void onReceiveMosaics(@Payload String message) {
LOGGER.trace("m=receiveMosaics(message={})", message);
GlebeDTO glebeDTO = receiveMosaicsMessageHandler.parseReceiveMessage(message);
LOGGER.info("Mosaicos do talhão {} recebidos", glebeDTO.getExternalId());
if(glebeDTO.getMosaics().size() >= 1){
processMosaicReceivedUseCase.processWithMosaics(buildCriteria(glebeDTO));
}else{
processMosaicReceivedUseCase.processWithoutMosaics(buildCriteria(glebeDTO));
}
}
@SqsListener(value = "${cloud.aws.sqs.name.image-mosaic-demand-order-request}", deletionPolicy = ON_SUCCESS)
public void receiveOrder(@Payload String message) {
LOGGER.trace("m=receiveOrder(message={})", message);
OrderRequestDTO orderRequestDTO = this.requestOrderMessageHandler.parseReceiveMessage(message);
LOGGER.info("{} - Recebida solicitação de mosaicos", orderRequestDTO.getOrderId());
validator.validate(orderRequestDTO);
Demand demand = orderRequestDTOToDemandConverter.convert(orderRequestDTO);
receiveRequestUseCase.onReceiveRequest(demand);
}
有没有我缺少的配置?
解决方案
我知道这是一个老问题,我没有答案 - 但我们遇到了类似甚至可能相同的问题。
我们的@SqsListener 在运行好几天后停止提取数据,并且毫无问题地提取了数十万条消息。
我们进行了广泛的监控和日志记录——但监控表明,当这种情况发生时,我们使用的 RAM/CPU 并没有比一切正常的时候多。在此之前或之后,我们的日志不包含任何错误。
似乎 SqsListener 使用的线程只是停止/死亡并且没有恢复活力,我们必须重新启动 Java 服务才能再次进行集成。
推荐阅读
- visual-studio - 在 Visual Studio 中找到的带有块的文件图标是什么?
- python - 如何制作专属QCheckBox
- python - dropbox.exceptions.ApiError: ... CreateFileRequestError('validation_error', None)
- javascript - 当类型在反应儿童中变得更加具体时,打字稿文字类型联合问题
- node.js - 在 post 节点 express 中读取并附加
- javascript - 如何使用 TurfJS 获取所有最近的位置?
- javascript - React-Native 如何为每个项目设置不同的状态
- php - 根据 WooCommerce 购物车中的购物车项目数显示消息
- java - Java:如何区分何时声明其自身或其父对象的 Object 实例?
- vue.js - 运行时的 Vue 和 lint:如何配置?