首页 > 技术文章 > activemq demo指南

xiaoblog 2015-09-17 15:56 原文

   queue与topic的技术特点对比

 

 

topic

queue

概要

Publish Subscribe messaging 发布订阅消息

Point-to-Point 点对点

有无状态

topic数据默认不落地,是无状态的。

Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。

完整性保障

并不保证publisher发布的每条数据,Subscriber都能接受到。

Queue保证每条数据都能被receiver接收。

消息是否会丢失

一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。

Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。

消息发布接收策略

一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器

一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

          Topic和queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

 

queue 通讯:

消息提供者:

	 public static void main(String[] args) throws JMSException { 
         // ConnectionFactory :连接工厂,JMS 用它创建连接 
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 
                         ActiveMQConnection.DEFAULT_USER, 
                         ActiveMQConnection.DEFAULT_PASSWORD, 
                         "tcp://localhost:61616"); 
         //JMS 客户端到JMS Provider 的连接 
         Connection connection = connectionFactory.createConnection(); 
         connection.start(); 
         // Session: 一个发送或接收消息的线程 
         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
         // Destination :消息的目的地;消息发送给谁. 
         // 获取session注意参数值my-queue是Query的名字 
         Destination destination = session.createQueue("my-queue"); 
         //Destination destination =  session.createTopic("my-topic");
         // MessageProducer:消息生产者 
         MessageProducer producer = session.createProducer(destination); 
         //设置不持久化 
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
         //发送一条消息 
         sendMsg(session, producer); 
         session.commit(); 
         connection.close(); 
 } 

 public static void sendMsg(Session session, MessageProducer producer) throws JMSException { 
         //创建一条文本消息 
         TextMessage message = session.createTextMessage("Hello ActiveMQ!"); 
         //通过消息生产者发出消息 
         producer.send(message); 
         System.out.println(message.toString()); 
 } 

  消息接收者:

public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            destination =  session.createQueue("my-queue");
            consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive(1000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

  Topic通讯:

广播提供者:

 private static final int SEND_NUMBER = 5;
    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <=SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("ActiveMq发送的消息" + i);
            //发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }
   
    public static void main(String[] args) {
        // ConnectionFactory:连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination:消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        connectionFactory = new ActiveMQConnectionFactory(
        		ActiveMQConnectionFactory.DEFAULT_USER,
        		ActiveMQConnectionFactory.DEFAULT_PASSWORD,ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL
              );
        try {
            //构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            //启动
            connection.start();
            //获取操作连接
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //获取session注意参数值FirstTopic是一个服务器的topic(与queue消息的发送相比,这里是唯一的不同)
            destination = session.createTopic("FirstTopic");
            //得到消息生成者【发送者】
            producer = session.createProducer(destination);
            //设置不持久化,此处学习,实际根据项目决定
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //构造消息,此处写死,项目就是参数,或者方法获取
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

  广播接收者:

   private String threadName;

    ReceiveTopic(String threadName) {
         this.threadName = threadName;
    }

    public void run() {
         // ConnectionFactory:连接工厂,JMS用它创建连接
         ConnectionFactory connectionFactory;
         // Connection:JMS客户端到JMS Provider的连接
         Connection connection =null;
         // Session:一个发送或接收消息的线程
         Session session;
         // Destination:消息的目的地;消息发送给谁.
         Destination destination;
         //消费者,消息接收者
         MessageConsumer consumer;
         connectionFactory = new ActiveMQConnectionFactory(
        		 ActiveMQConnectionFactory.DEFAULT_USER,
         		ActiveMQConnectionFactory.DEFAULT_PASSWORD,ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
         try {
               //构造从工厂得到连接对象
               connection = connectionFactory.createConnection();
               //启动
               connection.start();
               //获取操作连接,默认自动向服务器发送接收成功的响应
               session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
               //获取session注意参数值FirstTopic是一个服务器的topic
               destination = session.createTopic("FirstTopic");
               consumer = session.createConsumer(destination);
               while (true) {
                    //设置接收者接收消息的时间,为了便于测试,这里设定为100s
                    TextMessage message = (TextMessage) consumer
                                .receive(1 * 1000);
                    if (null != message) {
                          System.out.println("线程"+threadName+"收到消息:" + message.getText());
                    } else {
                          continue;
                    }
               }
         } catch (Exception e) {
               e.printStackTrace();
         } finally {
               try {
                    if (null != connection)
                          connection.close();
               } catch (Throwable ignore) {
               }
         }
    }

    public static void main(String[] args) {
          //这里启动3个线程来监听FirstTopic的消息,与queue的方式不一样三个线程都能收到同样的消息
         ReceiveTopic receive1=new ReceiveTopic("thread1");
         ReceiveTopic receive2=new ReceiveTopic("thread2");
         ReceiveTopic receive3=new ReceiveTopic("thread3");
         Thread thread1=new Thread(receive1);
         Thread thread2=new Thread(receive2);
         Thread thread3=new Thread(receive3);
         thread1.start();
         thread2.start();
         thread3.start();
    }

  如果传输中文:可以设置字符串的编码格式new String(msg.getBytes("utf-8"),"ISO-8859-1");  new String(message.toString().getBytes("ISO-8859-1"), "UTF-8");

 

推荐阅读