首页 > 解决方案 > 有没有办法异步使用来自 RabbitMQ 主题的消息?

问题描述

我正在尝试向RabbitMQ主题发送消息,然后运行接收消息的类。但我不知道为什么,我的消费者只同步接收消息。我看到了一些关于如何使用队列执行此操作的示例,但是对于一个主题,它似乎有点不同。谁能告诉我我做错了什么?

这是我的课程:

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";
    private static String message;


    public static void main(String[] args) throws Exception {
        String message = getMessage("#");
        System.out.println(message);
    }

    public static String getMessage(String argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, argv);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });

        EmitLogTopic.sendMessage();

        connection.close();

        return message;
    }
}

.

public class EmitLogTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static String sendMessage() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting();
        String message = getMessage();

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        return message;
    }
}

private static String getRouting() {
    return "anonymous.info";
}

private static String getMessage() {
        return "Hello World!";
}

}

标签: javarabbitmq

解决方案


推荐阅读