首页 > 解决方案 > 如何从监听器发送回复

问题描述

我正在为我的 Java 和 RabbitMQ 学习做作业。我对 Spring 和 RabbitMQ 不太熟悉,但我无法处理这个问题。

我有 2 个单一的应用程序。

第一个,产生消息(bolid 应用程序) 我创建了消息的生产者(bolid),它每 10 秒向侦听器发送一条消息

@SpringBootApplication
public class BolidApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(BolidApplication.class, args);
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void run(String... args) throws Exception {

        Bolid bolid = new Bolid();
        int i = 10;
        while (true) {
            bolid.setData(new Date());
            rabbitTemplate.setReplyAddress("bolidReply");
            rabbitTemplate.convertAndSend("RaceExchange", "raceRouting", bolid.toString());
            rabbitTemplate.convertAndSend("MonitorExchange", "raceRouting", bolid.toString());
            Thread.sleep(15000);
            i += 10;
        }
    }
}

所以,我创建了 2 个队列(RaceQueue 和 MonitorQueue),定义交换并绑定它们。

我有 2 个听众:RaceListener 和 MonitorListener。

有我的听众的代码:

第二个应用程序,即监听器。

public class RabbitConfig {
    private static final String RACE_QUEUE = "RaceQueue";
    private static final String MONITOR_QUEUE = "MonitorQueue";

    @Bean
    Queue myQueue() {
        return new Queue(RACE_QUEUE, true);
    }

    @Bean
    Queue monitorQueue() {
        return new Queue(MONITOR_QUEUE, true);
    }

    @Bean
    Exchange myExchange() { 
        return ExchangeBuilder.topicExchange("RaceExchange")
                .durable(true)
                .build();
    }

    @Bean
    Exchange monitorExchange() { 
        return ExchangeBuilder.topicExchange("MonitorExchange")
                .durable(true)
                .build();
    }

    @Bean
    Binding binding() {
//        return new Binding(MY_QUEUE, Binding.DestinationType.QUEUE, "MyTopicExchange", "topic", null)
        return BindingBuilder
                .bind(myQueue())
                .to(myExchange())
                .with("raceRouting")
                .noargs();
    }

    @Bean
    Binding monitorBinding() {
        return BindingBuilder
                .bind(monitorQueue())
                .to(monitorExchange())
                .with("raceRouting")
                .noargs();
    }

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        return cachingConnectionFactory;
    }

    @Bean
    MessageListenerContainer rabbitRaceListener() {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setConnectionFactory(connectionFactory());
        simpleMessageListenerContainer.setQueues(myQueue());
        simpleMessageListenerContainer.setupMessageListener(new RabbitRaceListener());
        return simpleMessageListenerContainer;
    }

    @Bean
    MessageListenerContainer rabbitMonitorListener() {

        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setConnectionFactory(connectionFactory());
        simpleMessageListenerContainer.setQueues(monitorQueue());
        simpleMessageListenerContainer.setupMessageListener(new RabbitMonitorListener());
        return simpleMessageListenerContainer;
    }
}

从 MonitorListener 我想使用回复模式将消息回复到我的第一个应用程序(bolid 应用程序)。所以 Bolid 应用程序可以收到我的消息。

我的 MonitorListener 代码:

public class RabbitMonitorListener implements MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void onMessage(Message message) {
        String[] splitted = new String(message.getBody()).split("\\|");
        int oilTemperature = Integer.parseInt(splitted[1].split(" ")[2]);
        int engineTemperature = Integer.parseInt(splitted[2].split(" ")[2]);
        int tirePressure = Integer.parseInt(splitted[3].split(" ")[2]);

        System.out.println("message2 = [" + new String(message.getBody()) + "]");

        if (oilTemperature > 120 || engineTemperature > 120 || tirePressure > 12) {
            System.out.println("SEND REPLY TO BOLID!");
        }
        if (oilTemperature > 150 || engineTemperature > 150 || tirePressure > 17) {
            System.out.println("SEND RELY TO BOLID!");
        }
    }
}

我怎样才能做到这一点?所以在这里我可以将消息发送回 bolid 并且在 bolid 应用程序上我可以阅读它?

编辑:我做了一些研究,我想这样做:

public class RabbitMonitorListener implements MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        String[] splitted = new String(message.getBody()).split("\\|");
        int oilTemperature = Integer.parseInt(splitted[1].split(" ")[2]);
        int engineTemperature = Integer.parseInt(splitted[2].split(" ")[2]);
        int tirePressure = Integer.parseInt(splitted[3].split(" ")[2]);
        String response = "Hello";
        MessageProperties properties = new MessageProperties();
        Message responseMessage = new Message(response.getBytes(), properties);
        rabbitTemplate.send(message.getMessageProperties().getReplyTo(), responseMessage);

        System.out.println("message2 = [" + new String(message.getBody()) + "]");

        if (oilTemperature > 120 || engineTemperature > 120 || tirePressure > 12) {
            System.out.println("WARN MECHANICS");

        }
        if (oilTemperature > 150 || engineTemperature > 150 || tirePressure > 17) {
            System.out.println("WARN MECHANICS");
        }
    }
}

但是 rabbitTemplate 在这里是空的,所以我不能在这里 @Autowired 它。如何在 MessageListener 中访问 rabbitTemplate 和方法 send?

标签: springrabbitmqspring-amqpspring-rabbit

解决方案


new RabbitRaceListener()- 这也必须是一个@Bean,以获得自动接线。

但是,您使事情过于复杂;该框架可以为您处理所有这些。

请参阅客户端的请求/回复消息convertSendAndReceive()- 并使用or convertSendAndReceiveAsType()

在服务器端,请参阅Annotation-driven Listener Endpoints

@RabbitListener(queues = "request")
public String handle(String in) {
    return in.toUpperCase();
}

推荐阅读