首页 > 解决方案 > 使用Java lambda parallelStream时RabbitMQ Spring“无法确定查找键的目标ConnectionFactory”

问题描述

我们有一个使用 RabbitMQ 的 Spring Java 应用程序,下面是场景:

    public void sendMessage(final StagingMessage stagingMessage, final Long timestamp, final String country) {

        final List<TransformedMessage> messages = processMessageList(stagingMessage);

        messages.parallelStream().forEach(message -> {
            final TransformedMessage transformedMessage = buildMessage(timestamp, ApiConstants.POST_METHOD, country);
            myMessageSender.sendQueue(country, transformedMessage);
        });
    }

Connectio Facotory,其中设置了查找键:

@Configuration
@EnableRabbit
public class RabbitBaseConfig {

    @Autowired
    private QueueProperties queueProperties;

    @Bean
    @Primary
    public ConnectionFactory connectionFactory(final ConnectionFactory connectionFactoryA, final ConnectionFactory connectionFactoryB) {

        final SimpleRoutingConnectionFactory simpleRoutingConnectionFactory = new SimpleRoutingConnectionFactory();
        final Map<Object, ConnectionFactory> map = new HashMap<>();

        for (final String queue : queueProperties.getAQueueMap().values()) {
            map.put("[" + queue + "]", connectionFactoryA);
        }

        for (final String queue : queueProperties.getBQueueMap().values()) {
            map.put("[" + queue + "]", connectionFactoryB);
        }

        simpleRoutingConnectionFactory.setTargetConnectionFactories(map);
        return simpleRoutingConnectionFactory;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {

        return new Jackson2JsonMessageConverter();
    }
}

标签: springrabbitmqspring-amqp

解决方案


欢迎来到堆栈溢出!

在提出这样的问题时,您应该始终显示相关的代码和配置 bean。

我假设您正在使用RoutingConnectionFactory.

它使用 aThreadLocal来存储查找键,因此发送必须发生在设置键的同一线程上。

无论如何,您通常永远不应该在侦听器中异步;你冒着信息丢失的风险。要增加并发性,请使用容器上的并发属性。

编辑

一种技术是在消息头中传达查找键:

    @Bean
    public RabbitTemplate template(ConnectionFactory rcf) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(rcf);
        Expression expression = new SpelExpressionParser().parseExpression("messageProperties.headers['cfSelector']");
        rabbitTemplate.setSendConnectionFactorySelectorExpression(expression);
        return rabbitTemplate;
    }

    @RabbitListener(queues = "foo")
    public void listen1(String in) {
        IntStream.range(0, 10)
            .parallel()
            .mapToObj(i -> in + i)
            .forEach(val -> {
                this.template.convertAndSend("bar", val.toUpperCase(), msg -> {
                    msg.getMessageProperties().setHeader("cfSelector", "[bar]");
                    return msg;
                });
            });
    }

推荐阅读