amazon-sns - 在 AWS 中使用 http 订阅 SNS 主题
问题描述
我在 SNS 主题中创建了一个 Http 订阅。SNS 订阅中提到的 url 是一个 EC2 实例。
需要知道将在 EC2 实例中接收 SNS 订阅请求的位置。
我必须在 EC2 中安装哪些应用程序才能订阅 http 请求。使用的端口是 8080。
解决方案
如 Amazon Doc for SNS HTTP(S) based subscription 中所述...更多
在您将 HTTP 或 HTTPS 终端节点订阅到某个主题之前,您必须确保 HTTP 或 HTTPS 终端节点可以处理 Amazon SNS 用于发送订阅确认和通知消息的 HTTP POST 请求。当您订阅 HTTP 终端节点时,Amazon SNS 会向其发送订阅确认请求。创建订阅时,您的终端节点必须准备好接收和处理此请求,因为此时 Amazon SNS 会发送此请求。在您确认订阅之前,Amazon SNS 不会向终端节点发送通知。确认订阅后,当对订阅主题执行发布操作时,Amazon SNS 将向终端节点发送通知。
在处理SubscriptionConfirmation请求之前,状态始终处于Pending 确认状态,不会收到任何Notification消息。
这是自动处理SubscriptionConfirmation、Notification和UnsubscribeConfirmation messageType 请求的代码片段,以及由 AWS SNS 触发的上述请求的Content-Type始终为文本/纯文本格式。
弹簧启动方式
通过以下代码为 SNS 消费者使用 Spring-boot 的主要优点是您不需要依赖任何 AWS 指定的配置及其依赖项。如果您的微服务是使用spring-boot
而不是使用spring-cloud-aws-messaging
. 您的消费者端应用程序需要执行以下步骤。
将以下注释添加到请求处理程序方法以处理 SubscriptionConfirmation和UnsubscribeConfirmation消息类型。
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface SNSSubscriptionUnSubscriptionConfirmation {
}
如果带有注解的请求处理程序方法@SNSSubscriptionUnSubscriptionConfirmation,
将根据 Http(s) 请求标头处理SubscriptionConfirmation和UnsubscribeConfirmationx-amz-sns-message-type
消息类型,则将触发以下方面。
@Aspect
@Component
@Slf4j
public class SNSSubscriptionUnSubscriptionActivation {
@Before(value = "@annotation(SNSSubscriptionUnSubscriptionConfirmation)")
public void SNSSubscriptionUnSubscriptionConfirmationActivation(JoinPoint joinPoint) {
Object[] args = joinPoint.getArgs();
HttpServletRequest httpServletRequest = (HttpServletRequest)args[0];
String requestBody = (String)args[1];
String messageType = httpServletRequest.getHeader("x-amz-sns-message-type");
String topicArn = httpServletRequest.getHeader("x-amz-sns-topic-arn");
if(!StringUtils.isEmpty(messageType)){
if("SubscriptionConfirmation".equals(messageType)){
activateSNSSubscriptionUnSubscription(requestBody, messageType, topicArn, "SubscribeURL");
} else if("UnsubscribeConfirmation".equals(messageType)){
activateSNSSubscriptionUnSubscription(requestBody, messageType, topicArn, "UnsubscribeURL");
}
}
}
private void activateSNSSubscriptionUnSubscription(String requestBody, final String messageType, String topicArn, String subscribeUnsubscribeURLKey) {
log.info(messageType + " payload: {}", requestBody);
JsonMapper mapper = JsonMapper.builder()
.configure(JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES, true)
.build();
try {
Map<String, String> maps = mapper.readValue(requestBody, new TypeReference<Map<String, String>>() {
});
String subscribeUnsubscribeURL = maps.get(subscribeUnsubscribeURLKey);
RestTemplate restTemplate = new RestTemplate();
//Manually activating the subscribe and UnsubscribeURL requests by making direct HTTP call using rest controller
ResponseEntity<Void> response = restTemplate.exchange(subscribeUnsubscribeURL, HttpMethod.GET, null, Void.class);
if (response.getStatusCode().is2xxSuccessful())
log.info("topicArn: {} messageType: {} Successful: {}", topicArn, messageType, response.getStatusCode());
else {
log.error("topicArn: {} messageType: {} failure Status: {}", topicArn, messageType, response.getStatusCode());
}
} catch (JsonProcessingException e){
log.error("topicArn: {} messageType: {} failure error: {}", topicArn, messageType, e.getMessage());
}catch(HttpClientErrorException e) {
log.error("topicArn: {} messageType: {} failure error: {}", topicArn, messageType, e.getResponseBodyAsString());
}
}
}
控制器的 handler 方法处理Notification messageType,因为剩余的两个 messageType 被SNSSubscriptionUnSubscriptionActivation
作为使用 注释的请求处理程序方法处理@SNSSubscriptionUnSubscriptionConfirmation()
。
@Slf4j
@RestController
public class AWSSNSConsumerController {
@PostMapping(value = "subscribed-endpoint", consumes = MediaType.TEXT_PLAIN_VALUE)
@SNSSubscriptionUnSubscriptionConfirmation()
public ResponseEntity<Void> notification(HttpServletRequest httpServletRequest, @RequestBody() String requestBody) {
log.info("Notification payload: {}", requestBody);
String topicArn = httpServletRequest.getHeader("x-amz-sns-topic-arn");
String messageType = httpServletRequest.getHeader("x-amz-sns-message-type");
if ("Notification".equals(messageType)) {
JsonMapper mapper = JsonMapper.builder()
.configure(JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES, true)
.build();
try {
Map<String, String> maps = mapper.readValue(requestBody, new TypeReference<Map<String, String>>() {
});
String message = maps.get("Message");
log.info("topic : {} message: {} ", topicArn, message);
} catch (JsonProcessingException e) {
log.error("topic : {} Notification failure error: {}", topicArn, e.getMessage());
} catch (HttpClientErrorException e) {
log.error("topic : {} Notification failure error: {}", topicArn, e.getResponseBodyAsString());
}
}
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
}
如果您使用的是spring-cloud-aws,请遵循这个很棒的教程
推荐阅读
- python - 带有循环退出和重新启动的计算器程序
- regex - 我可以在 mac 终端中使用带有 cd 命令的正则表达式吗?
- javascript - 在node.js中运行示例代码成功,但在浏览器中运行修改后的代码失败
- ruby-on-rails - 从 NativeScript 上传到 Rails Shrine
- python - tkinter 或其他 GUI 模块中是否有任何小部件可以制作一个覆盖 python 中任何程序的饼图菜单?
- ajax - 使用 ajax 在 Wordpress 前端删除/删除帖子不起作用
- linux - 从 XCB 中的键名获取键码?
- powershell - 不存在的文件夹上的 PowerShell Get-ChildItem 与 -recurse 标志的行为不同
- grpc - GRPC / Proto - @io.grpc.ExperimentalApi 缺少默认值
- python - 如何解决 Get_dummies 引起的内存错误