java - AWS SQS 侦听器 - 断路器
问题描述
我已经构建了一个 SQS 侦听器,它从 SQS 轮询消息并将消息发送到 REST 服务。如果 REST 服务已关闭,我不想在队列准备好之前将消息从队列中拉出。我正在使用 spring cloud 从 SQS 读取消息。是否有任何示例说明如何实现这一目标?
我已经为这个队列配置了死信队列。如果后端服务关闭,我不希望所有消息都以 DLQ 结束。
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
return credentialsProvider;
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard().
withCredentials(awsCredentialsProvider()).
withClientConfiguration(clientConfiguration()).
build();
}
@Bean
@ConfigurationProperties(prefix = "aws.queue")
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("oaoQueueExecutor");
executor.initialize();
return executor;
}
@Bean
public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
}
听众
@SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
System.out.println(" Data = " + serviceData + " MessageId = " + messageId);
repository.execute(serviceData);
}
解决方案
推荐阅读
- macos - 如何在 Windows 上使用 VS 创建 MacOS 控制台应用程序
- pandas - pandas 1.0 用 NAN 替换一个字符串 dtype 列
- docker - 每次请求都拒绝 DigitalOcean 液滴连接
- html - 如何拉伸 YouTube 嵌入视频?
- vue.js - 在 Vue.js 中导入文本
- tensorflow - 无法使用 Tensorflow.js predict() 函数
- node.js - 使用 OneSignal Nodejs 发送通知
- reactjs - React Child 不会更新 Parent 的 Hook 状态
- ios - SwiftUI 导航栏不会消失
- python - 从 OrderedDict 列表中查找项目的最 Pythonic 方式是什么?