首页 > 技术文章 > RabbitMQ(topic主题模式)

sxmblogs 2021-05-18 19:29 原文

一、topic主题模式

特点:模糊的routingkey的匹配模式
注意:*代表是必须为一个;#代表0个或者多个

二、代码

RabbitMQ界面配置

创建交换机

创建队列

将队列绑定在交换机上

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //所有的中间件技术都是基于TCP/IP协议基础构建的协议规范,rabbitmq遵循的是ampq协议
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2.创建连接Connection
        Connection connection = connectionFactory.newConnection();
        //3.通过连接获取通道channel
        Channel channel = connection.createChannel();
        //4.通过通道创建交换机、声明队列、绑定关系、路由key、发送消息、和接受消息


        //5.准备消息内容
        String msg="hello topic";
        //6.准备交换机
        String exchangeName="topic_exchange";
        //定义路由key
        String routingkey="a.add";
        //定义指定交换机类型
        String exchangeType="topic";
//        //声明队列
//        channel.queueDeclare("q1",false,false,false,null);
//        channel.queueDeclare("q2",false,false,false,null);
//        channel.queueDeclare("q3",false,false,false,null);
//        //声明交换机
//        channel.exchangeDeclare("fanout_exchange","fanout");
//        //将队列绑定到交换机
//        channel.queueBind("q1",exchangeName,null);
//        channel.queueBind("q2",exchangeName,null);
//        channel.queueBind("q3",exchangeName,null);

        //6.发送消息给队列queue
        /*
        参数一:交换机
        参数二:队列、路由key
        参数三:消息的状态控制
        参数四:消息主体
         */
        channel.basicPublish(exchangeName,routingkey,null,msg.getBytes());
        //7.关闭通道
        channel.close();
        //8.关闭连接
        connection.close();
        System.out.println("生产成功");
    }
}

消费者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //获取队列名称
            final String queueName=Thread.currentThread().getName();
            Connection connection = null;
            Channel channel=null;
            try {
                //2.创建连接Connection
                connection = connectionFactory.newConnection();
                //3.通过连接获取通道channel
                 channel = connection.createChannel();

                 //定义接受消息回调
                Channel finalChannel =channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(delivery.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {

                    }
                });
                System.out.println(queueName+":开始接受消息");
                System.in.read();

            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
                System.out.println("发送消息异常。");
            }finally {
                //7.关闭通道释放连接
                if (channel !=null && channel.isOpen()){
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                    if (connection !=null && connection.isOpen()){
                        try {
                            connection.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }

        }
    };


    public static void main(String[] args)  {
        //启动三个线程
        new Thread(runnable,"q7").start();
        new Thread(runnable,"q8").start();
        new Thread(runnable,"q9").start();
    }
}

add的结果


推荐阅读