首页 > 解决方案 > AWS SqsListener 使用 Jackson 反序列化自定义对象

问题描述

一旦我尝试接收自定义对象,我只能收听字符串,它会引发以下错误。看来我需要教 Spring 处理我的自定义对象(B2BOrder)

org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [br.com.b2breservas.api.model.B2BOrder] for GenericMessage [payload={"comments":"95d29059-8552-42fa-8fd9-a1d776416269"},

我的 SQSConfig

@Configuration
@EnableSqs
public class SqsConfig {

    private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(SimpleMessageListenerContainer.class) + "-";

    @Bean
    public QueueMessagingTemplate myMessagingTemplate(AmazonSQSAsync amazonSqs, ResourceIdResolver resolver) {
        ObjectMapper mapper = new ObjectMapper()
                .registerModule(new ParameterNamesModule())
                .registerModule(new Jdk8Module())
                .registerModule(new JodaModule())
                .registerModule(new JavaTimeModule());
        // configure the Jackson mapper as needed
        // maybe I need to do something here!

        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setSerializedPayloadClass(String.class);
        converter.setStrictContentTypeMatch(false);
        converter.setObjectMapper(mapper);

        return new QueueMessagingTemplate(amazonSqs, resolver, converter);
    }

    @Bean
    public ClientConfiguration sqsClientConfiguration() {
        return new ClientConfiguration()
                .withConnectionTimeout(30000)
                .withRequestTimeout(30000)
                .withClientExecutionTimeout(30000);
    }

    @Bean
    public ExecutorFactory sqsExecutorFactory() {
        return new ExecutorFactory() {
            @Override
            public ExecutorService newExecutor() {
                return new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            }
        };
    }

    @Value("${b2b.b2b.accesstoken}")
    public String accesstoken;

    @Value("${b2b.b2b.secretkey}")
    public String secretkey;

    @Bean
    public AmazonSQSAsync amazonSqs(ClientConfiguration sqsClientConfiguration, ExecutorFactory sqsExecutorFactory) {
        BasicAWSCredentials credential = new BasicAWSCredentials(accesstoken, secretkey);
        return AmazonSQSAsyncClientBuilder.standard()
                .withClientConfiguration(sqsClientConfiguration)
                .withExecutorFactory(sqsExecutorFactory)
//                .withEndpointConfiguration(sqsEndpointConfiguration)
//                .withCredentials(credentialsProvider)
                .withCredentials(new AWSStaticCredentialsProvider(credential))
                .build();
    }


    @Bean
    public AsyncTaskExecutor queueContainerTaskEecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix(DEFAULT_THREAD_NAME_PREFIX);
        threadPoolTaskExecutor.setCorePoolSize(2);
        threadPoolTaskExecutor.setMaxPoolSize(2);
        // No use of a thread pool executor queue to avoid retaining message to long in memory
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs, AsyncTaskExecutor queueContainerTaskEecutor) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSqs);
        factory.setAutoStartup(true);
//        factory.setQueueMessageHandler();
        factory.setMaxNumberOfMessages(1);
        factory.setWaitTimeOut(20);
        factory.setTaskExecutor(queueContainerTaskEecutor);
        return factory;
    }

}

听众

@Component
public class SqsHub {


    @SqsListener(
            "https://sqs.us-west-2.amazonaws.com/3234/32443-checkout.fifo"
    )
    public void listen(B2BOrder message) {
//  public void listen(String message) { THIS WORKS!!   
        System.out.println("!!!! received message {} {}" + message.toString());
    }
}

发送

    ....
    @Autowired
    AmazonSQSAsync amazonSqs;

    @GetMapping("/yay")
    public String yay() {
        try {
            B2BOrder pendingOrder = new B2BOrder();
            pendingOrder.setComments(UUID.randomUUID().toString());
            String pendingOrderJson = objectMapper.writeValueAsString(pendingOrder);
            QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs);
            Map<String, Object> headers = new HashMap<>();
            headers.put(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "my-application");
            headers.put(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
            queueMessagingTemplate.convertAndSend("booking-checkout.fifo", pendingOrderJson, headers);
        } catch (final AmazonClientException | JsonProcessingException ase) {
            System.out.println("Error Message: " + ase.getMessage());
        }
        return "sdkjfn";
    }
    ....

简单的自定义对象


public class B2BOrder implements Serializable {

    @JsonProperty
    private String comments;

}

更新

@Michiel 回答把我带到了这里,但仍然遇到同样的错误。

    @Autowired
    public ObjectMapper objectMapper;


    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs, AsyncTaskExecutor queueContainerTaskEecutor) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSqs);
        factory.setAutoStartup(true);
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSqs);
        MappingJackson2MessageConverter jsonMessageConverter = new MappingJackson2MessageConverter();
        jsonMessageConverter.setObjectMapper(objectMapper);
        queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(jsonMessageConverter));
        factory.setQueueMessageHandler(queueMessageHandlerFactory.createQueueMessageHandler());
//        factory.setMaxNumberOfMessages(1);
        factory.setWaitTimeOut(20);
        factory.setTaskExecutor(queueContainerTaskEecutor);
        return factory;
    }
    ```

标签: javaspringamazon-web-servicesjacksonamazon-sqs

解决方案


虽然您已经注册了 a MessageConverter,但它仅配置为在传出请求中使用(使用QueueMessagingTemplate)。您MessageListener没有MessageConverter配置。因此,传入的消息只能作为“原始”类型检索,例如 String。

在您的代码段中,您注释了以下代码行:

//        factory.setQueueMessageHandler();

这是您可以设置一个QueueMessageHandler本身已MessageConverters附加一个或多个的位置。

[编辑] 当然:

QueueMessageHandlerFactory handlerFactory = new QueueMessageHandlerFactory();
handlerFactory.setMessageConverters(yourJacksonConfig);
QueueMessageHandler messageHandler = handlerFactory.createQueueMessageHandler();
factory.setQueueMessageHandler(messageHandler);

这个Spring 文档可能会有所帮助。


推荐阅读