首页 > 解决方案 > 在 AWS 中使用 http 订阅 SNS 主题

问题描述

我在 SNS 主题中创建了一个 Http 订阅。SNS 订阅中提到的 url 是一个 EC2 实例。

需要知道将在 EC2 实例中接收 SNS 订阅请求的位置。

我必须在 EC2 中安装哪些应用程序才能订阅 http 请求。使用的端口是 8080。

标签: amazon-sns

解决方案


如 Amazon Doc for SNS HTTP(S) based subscription 中所述...更多

在您将 HTTP 或 HTTPS 终端节点订阅到某个主题之前,您必须确保 HTTP 或 HTTPS 终端节点可以处理 Amazon SNS 用于发送订阅确认和通知消息的 HTTP POST 请求。当您订阅 HTTP 终端节点时,Amazon SNS 会向其发送订阅确认请求。创建订阅时,您的终端节点必须准备好接收和处理此请求,因为此时 Amazon SNS 会发送此请求。在您确认订阅之前,Amazon SNS 不会向终端节点发送通知。确认订阅后,当对订阅主题执行发布操作时,Amazon SNS 将向终端节点发送通知。

在处理SubscriptionConfirmation请求之前,状态始终处于Pending 确认状态,不会收到任何Notification消息。


这是自动处理SubscriptionConfirmationNotificationUnsubscribeConfirmation messageType 请求的代码片段,以及由 AWS SNS 触发的上述请求的Content-Type始终为文本/纯文本格式。

弹簧启动方式

通过以下代码为 SNS 消费者使用 Spring-boot 的主要优点是您不需要依赖任何 AWS 指定的配置及其依赖项。如果您的微服务是使用spring-boot而不是使用spring-cloud-aws-messaging. 您的消费者端应用程序需要执行以下步骤。

将以下注释添加到请求处理程序方法以处理 SubscriptionConfirmationUnsubscribeConfirmation消息类型。

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface SNSSubscriptionUnSubscriptionConfirmation {
}

如果带有注解的请求处理程序方法@SNSSubscriptionUnSubscriptionConfirmation,将根据 Http(s) 请求标头处理SubscriptionConfirmationUnsubscribeConfirmationx-amz-sns-message-type消息类型,则将触发以下方面。

@Aspect
@Component
@Slf4j
public class SNSSubscriptionUnSubscriptionActivation {

    @Before(value = "@annotation(SNSSubscriptionUnSubscriptionConfirmation)")
    public void SNSSubscriptionUnSubscriptionConfirmationActivation(JoinPoint joinPoint) {
        Object[] args = joinPoint.getArgs();
        HttpServletRequest httpServletRequest = (HttpServletRequest)args[0];
        String requestBody = (String)args[1];
        String messageType = httpServletRequest.getHeader("x-amz-sns-message-type");
        String topicArn = httpServletRequest.getHeader("x-amz-sns-topic-arn");
        if(!StringUtils.isEmpty(messageType)){
            if("SubscriptionConfirmation".equals(messageType)){
                activateSNSSubscriptionUnSubscription(requestBody, messageType, topicArn, "SubscribeURL");
            } else if("UnsubscribeConfirmation".equals(messageType)){
                activateSNSSubscriptionUnSubscription(requestBody, messageType, topicArn, "UnsubscribeURL");
            }
        }
    }

    private void activateSNSSubscriptionUnSubscription(String requestBody, final String messageType, String topicArn, String subscribeUnsubscribeURLKey) {
        log.info(messageType + " payload: {}", requestBody);
        JsonMapper mapper = JsonMapper.builder()
                .configure(JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES, true)
                .build();
        try {
            Map<String, String> maps = mapper.readValue(requestBody, new TypeReference<Map<String, String>>() {
            });
            String subscribeUnsubscribeURL = maps.get(subscribeUnsubscribeURLKey);
            RestTemplate restTemplate = new RestTemplate();
//Manually activating the subscribe and UnsubscribeURL requests by making direct HTTP call using rest controller
            ResponseEntity<Void> response = restTemplate.exchange(subscribeUnsubscribeURL, HttpMethod.GET, null, Void.class);
            if (response.getStatusCode().is2xxSuccessful())
                log.info("topicArn: {} messageType: {} Successful: {}", topicArn, messageType, response.getStatusCode());
            else {
                log.error("topicArn: {} messageType: {} failure Status: {}", topicArn, messageType, response.getStatusCode());
            }
        } catch (JsonProcessingException e){
            log.error("topicArn: {} messageType: {} failure error: {}", topicArn, messageType, e.getMessage());
        }catch(HttpClientErrorException e) {
            log.error("topicArn: {} messageType: {} failure error: {}", topicArn, messageType, e.getResponseBodyAsString());
        }
    }
}

控制器的 handler 方法处理Notification messageType,因为剩余的两个 messageType 被SNSSubscriptionUnSubscriptionActivation作为使用 注释的请求处理程序方法处理@SNSSubscriptionUnSubscriptionConfirmation()

@Slf4j
@RestController
public class AWSSNSConsumerController {

    @PostMapping(value = "subscribed-endpoint", consumes = MediaType.TEXT_PLAIN_VALUE)
    @SNSSubscriptionUnSubscriptionConfirmation()
    public ResponseEntity<Void> notification(HttpServletRequest httpServletRequest, @RequestBody() String requestBody) {
        log.info("Notification payload: {}", requestBody);
        String topicArn = httpServletRequest.getHeader("x-amz-sns-topic-arn");
        String messageType = httpServletRequest.getHeader("x-amz-sns-message-type");
        if ("Notification".equals(messageType)) {
            JsonMapper mapper = JsonMapper.builder()
                    .configure(JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES, true)
                    .build();
            try {
                Map<String, String> maps = mapper.readValue(requestBody, new TypeReference<Map<String, String>>() {
                });
                String message = maps.get("Message");
                log.info("topic : {} message: {} ", topicArn, message);
            } catch (JsonProcessingException e) {
                log.error("topic : {} Notification failure error: {}", topicArn, e.getMessage());
            } catch (HttpClientErrorException e) {
                log.error("topic : {} Notification failure error: {}", topicArn, e.getResponseBodyAsString());
            }
        }
        return new ResponseEntity<>(HttpStatus.NO_CONTENT);
    }
}

如果您使用的是spring-cloud-aws,请遵循这个很棒的教程


推荐阅读