首页 > 解决方案 > 如何通过两个账户使用 AmazonSQS 侦听器

问题描述

我有两个工人阶级的申请。我希望他们从 AWS SQS 中提取,但来自两个不同的帐户。我正在使用@SQSListener 来实现这一点。我无法为每个队列设置正确的 AmazonSQS 客户端。尝试使用自定义destionationResolver,但它再次无法访问正确的 amazonSQS 客户端 bean。我正在使用 AmazonSQSAsync 也许这是问题的一部分。使用自定义目标解析器,我被拒绝访问其中一个队列。我的配置代码:

@Bean(destroyMethod = "shutdown")
    @Primary
    public AmazonSQSAsync amazonSQS() {

        AmazonSQSAsync amazonSQSAsyncClient = new AmazonSQSAsyncClient(new AWSCredentialsProvider() {
            public void refresh() {}

            public AWSCredentials getCredentials() {
                return new AWSCredentials() {
                    public String getAWSSecretKey() {return secretKey;}

                    public String getAWSAccessKeyId() {return accessKey;}
                };
            }
        });

        QueueBufferConfig config = new QueueBufferConfig();
        config.setMaxBatchOpenMs(maxBatchOpenMs);
        config.setMaxBatchSize(maxBatchSize);

        LOGGER.info("SQS Client Initialized Successfully");
        return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient, config);
    }

@Bean(destroyMethod = "shutdown")
    @Qualifier("workerSQS")
    public AmazonSQSAsync workerSQS() {
        final ClientConfiguration cc = new ClientConfiguration();
        cc.setConnectionTimeout(listenerConnectionTimeout);
        cc.setSocketTimeout(listenerSocketTimeout);
        cc.setMaxConnections(listenerMaxConnection);
        cc.setRequestTimeout(listenerRequestTimeout);
        cc.setUseReaper(true);
        //cc.setConnectionMaxIdleMillis();

        AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() {
            public void refresh() {}
            public AWSCredentials getCredentials() {
                return new AWSCredentials() {
                    public String getAWSSecretKey() {return routingSecretKey;}

                    public String getAWSAccessKeyId() {return routingAccessKey;}
                };
            }
        };

        AmazonSQSAsync amazonSQSAsyncClient = AmazonSQSAsyncClientBuilder.standard()
                .withCredentials(awsCredentialsProvider)
                .withRegion(Regions.US_EAST_1)
                .withClientConfiguration(cc)
                .build();

        // See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-client-side-buffering-request-batching.html
        // for QueueBufferConfig Configuration Parameters
        QueueBufferConfig config = new QueueBufferConfig();
        config.setLongPoll(true);

        return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient, config);
    }

 @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
        SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();
        msgListenerContainerFactory.setBackOffTime(listenerBackOffTime);
        msgListenerContainerFactory.setWaitTimeOut(listenerWaitTimeOut);
        msgListenerContainerFactory.setVisibilityTimeout(listenerVisibilityTimeOut);
        msgListenerContainerFactory.setMaxNumberOfMessages(listenerMaxMessagesPerPoll);
        msgListenerContainerFactory.setDestinationResolver(destinationResolver());
        return msgListenerContainerFactory;
    }

    @Bean
    public CustomDestinationResolver destinationResolver(){
        return new CustomDestinationResolver();
    }

    @Component
    public static class CustomDestinationResolver implements DestinationResolver{

        @Autowired
        private AmazonSQS amazonSQS;

        @Autowired
        @Qualifier("workerSQS")
        private AmazonSQSAsync amazonSQSAsync;

        @Override
        public String resolveDestination(String name) throws DestinationResolutionException {
            String queueName = name;

            if (queueName.startsWith("tl")) {
                try {
                    GetQueueUrlResult getQueueUrlResult = amazonSQSAsync.getQueueUrl(new GetQueueUrlRequest(name));
                    return getQueueUrlResult.getQueueUrl();
                } catch (QueueDoesNotExistException var4) {
                    throw new DestinationResolutionException(var4.getMessage(), var4);
                }
            } else {
                try {
                    GetQueueUrlResult getQueueUrlResult = amazonSQS.getQueueUrl(new GetQueueUrlRequest(name));
                    return getQueueUrlResult.getQueueUrl();
                } catch (QueueDoesNotExistException var4) {
                    throw new DestinationResolutionException(var4.getMessage(), var4);
                }
            }

        }
    }

标签: javaspringamazon-web-servicesamazon-sqs

解决方案


我无法使用 SQS 侦听器做到这一点,所以我尝试使用 JMS 侦听器并且它有效。我只是创建了两个 JMS listenerContainerFactory 并使用了它们。每个侦听器都有不同的 AWS 账户


推荐阅读