首页 > 技术文章 > rabbitmq简单模式

Y-wee 2021-12-26 20:31 原文

rabbitmq简单模式

一个生产者发送消息给一个消费者

代码实现

导入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.13.1</version>
</dependency>

工具类:方便获取连接

package com.yl.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * rabbitmq工具类
 *
 * @author Y-wee
 */
public class RabbitUtils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();

    static {
        connectionFactory.setHost("192.168.84.131");
        connectionFactory.setPort(5672);//mq的默认端口号
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");
        connectionFactory.setVirtualHost("/");
    }

    /**
     * 获取tcp物理连接
     *
     * @return
     */
    public static Connection getConnection() {
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
            return connection;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}

常量类:存储队列和交换机名称

package com.yl.util;

/**
 * rabbitmq常量类
 *
 * @author Y-wee
 */
public class RabbitConstant {
    // 简单模式队列名
    public static final String QUEUE_HELLOWORLD = "helloworldQueue";
    // 工作队列模式队列名
    public static final String QUEUE_WORK = "workqueueQueue";
    // 发布确认队列名
    public static final String QUEUE_CONFIRM = "confirmQueue";
    // 路由模式队列名
    public static final String QUEUE_ROUTE_A = "routeQueueA";
    public static final String QUEUE_ROUTE_B = "routeQueueB";
    // 主题模式队列名
    public static final String QUEUE_TOPIC_A = "topicQueueA";
    public static final String QUEUE_TOPIC_B = "topicQueueB";
    // 死信队列名
    public static final String QUEUE_NORMAL = "normalQueue";
    public static final String QUEUE_DEAD = "deadQueue";

    // 发布订阅模式交换机名
    public static final String EXCHANGE_PUBSUB = "pubsubExchange";
    // 路由模式交换机名
    public static final String EXCHANGE_ROUTE = "routeExchange";
    // 主题模式交换机名
    public static final String EXCHANGE_TOPIC = "topicExchange";
    // 死信队列交换机名
    public static final String EXCHANGE_NORMAL = "normalExchange";
    public static final String EXCHANGE_DEAD = "deadExchange";
}

消费者

package com.yl.helloword;

import com.rabbitmq.client.*;
import com.yl.util.RabbitConstant;
import com.yl.util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消息消费者
 *
 * @author Y-wee
 */
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 打印消息体
            System.out.println("接收到消息:"+new String(message.getBody()));
            // 确认接收到消息,第一个参数是消息的唯一标识,确认接收的是哪个消息;第二个参数表示是否批量确认
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消费失败");
        };

        /**
         * 消费消息
         * 第一个参数:队列名称
         * 第二个参数:是否自动确认收到消息,false代表手动编程来确认收到消息,这是mq的推荐做法
         * 第三个参数:消费者成功消费的回调
         * 第四个参数:消费者取消消费的回调
         */
        channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, deliverCallback, cancelCallback);
    }

}

生产者

package com.yl.helloword;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yl.util.RabbitConstant;
import com.yl.util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消息生产者
 *
 * @author Y-wee
 */
public class Provider {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = RabbitUtils.getConnection();
        // 创建通信通道,相当于tcp中虚拟连接
        Channel channel = connection.createChannel();
        /**
         * 通过通道创建一个队列,如果队列已存在,则使用这个队列.
         * 第一个参数:队列名称id,
         * 第二个参数:是否持久化。false对应不持久化,mq停掉数据就会丢失
         * 第三个参数:是否队列私有化,false代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才可以使用
         * 第四个参数:是否自动删除,false代表连接停掉后不自动删除这个队列
         *  null:其他的额外参数
         */
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);

        String message = "helloworld";

        /**
         * 发送消息
         * 第一个参数:exchange,交换机,暂时用不到,在后面进行发布订阅时才会用到
         * 第二个参数:队列名称,和上面保持一致
         * 第三个参数:额外设置的属性
         * 第四个参数:要传递的消息字节数组
         */
        channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes());

        System.out.println("数据发送成功");
    }

}

推荐阅读