首页 > 技术文章 > activemq 基本使用案例

bigsaltfish 2016-05-03 16:00 原文

  activemq有两种消息模式,一种是p2p,一种是pub。其中p2p是1对1,pub是1对多。这里就以这两种的发布和访问做demo。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * p2p(point-to-point)的消息发送和接收<br>
 * 注意事项:<br>
 *   1.点对点,接收者为一人,发送者也为一人<br>
 *   2.先启动send或者先启动receive都ok
 * @author zy
 *
 */
public class SimpleMessageSendandReceiveApp {
    public static final String user = "system"; // conf/credentials.properties下的配置
    public static final String password = "manager";
    public static final String url = "tcp://localhost:61616";// conf/activemq.xml-transportConnectors节点的openwire子节点
    public static final String queueName = "test_queue"; // 在localhost:8161/admin中创建的queue
    public static final String messageBody = "Hello JMS!";// 发送jms的内容,一般是textMessage或者ObjectMessage
    public static final boolean transacted = false; // 是否使用事务
    public static final boolean persistent = false;//提交方式,true/PERSISTENT 持久保留消息,以保证消息不会因为jms provider的失败而丢失   false/NON_PERSISTENT 不要求保持持久

    public static void main(String[] args) {
        sendMessage();
        
//        receiveMessage();
    }
    
    /**
     * 发送消息
     */
    public static void sendMessage(){
        Connection connection = null;
        Session session = null;

        try {
            // create the connection
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
            connection = connectionFactory.createConnection();
            connection.start();

            // create the session
            session = connection.createSession(transacted,Session.AUTO_ACKNOWLEDGE);// Session.AUTO_ACKNOWLEDGE-->consumer.receive()会话确认接收
            Destination destination = session.createQueue(queueName);

            // create the producer
            MessageProducer producer = session.createProducer(destination);
            if (persistent) {
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            } else {
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }

            // create text message
            Message message = session.createTextMessage(messageBody);

            // send the message
            producer.send(message);
            System.out.println("Send message: " + ((TextMessage) message).getText());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                // close session and connection
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 接收消息
     */
    public static void receiveMessage(){
        Connection connection = null;
        Session session = null;

        try {
            // create the connection
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
            connection = connectionFactory.createConnection();
            connection.start();

            // create the session
            session = connection.createSession(transacted,Session.AUTO_ACKNOWLEDGE);// Session.AUTO_ACKNOWLEDGE-->consumer.receive()会话确认接收
            Destination destination = session.createQueue(queueName);

            // create the producer
            MessageProducer producer = session.createProducer(destination);
            if (persistent) {
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            } else {
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }

            // create the consumer
            MessageConsumer consumer = session.createConsumer(destination);
            // blocking till receive the message
            Message recvMessage = consumer.receive();
            System.out.println("Receive message: " + ((TextMessage) recvMessage).getText());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                // close session and connection
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}


import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 发布模式(pub),一对多<br>
 * 注意事项:<br>
 *   1.一对多,接收者为多个<br>
 *   2.注册接收的时间 >= 消息发送时间,才可接收消息
 * @author zy
 *
 */
public class TopicMessageSendAndReceiveApp {
    public static final String user = "system"; // conf/credentials.properties下的配置
    public static final String password = "manager";
    public static final String url = "tcp://localhost:61616";// conf/activemq.xml-transportConnectors节点的openwire子节点
    public static final String queueName = "test_topic"; // 在localhost:8161/admin中创建的queue
    public static final String messageBody = "Hello JMS!";// 发送jms的内容,一般是textMessage或者ObjectMessage
    public static final boolean transacted = true; // 是否使用事务
    public static final boolean persistent = false;//提交方式,true/PERSISTENT 持久保留消息,以保证消息不会因为jms provider的失败而丢失   false/NON_PERSISTENT 不要求保持持久
    
    public static void main(String[] args) {
        
        receive();
        receive2();
        
        send();
    }
    public static void send(){
        try {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password,url);  
            Connection connection = factory.createConnection();  
            connection.start();
            
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
            Topic topic = session.createTopic(queueName);  
            MessageProducer producer = session.createProducer(topic);  
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
            for(int i =0;i<3;i++){  
                TextMessage message = session.createTextMessage();  
                message.setText("message_" + System.currentTimeMillis());  
                producer.send(message);  
                System.out.println("Send message: " + message.getText() + "   --->" + i);  
                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void receive(){
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        try {  
            Connection connection = factory.createConnection();  
            connection.start();  
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
            Topic topic = session.createTopic(queueName);  
            MessageConsumer consumer = session.createConsumer(topic);  
            consumer.setMessageListener(new MessageListener() {  
                public void onMessage(Message message) {  
                    TextMessage tm = (TextMessage) message;  
                    try {  
                        System.out.println("001 Received message: " + tm.getText());  
                    } catch (JMSException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }
    
    public static void receive2(){
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        try {  
            Connection connection = factory.createConnection();  
            connection.start();  
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
            Topic topic = session.createTopic(queueName);  
            MessageConsumer consumer = session.createConsumer(topic);  
            consumer.setMessageListener(new MessageListener() {  
                public void onMessage(Message message) {  
                    TextMessage tm = (TextMessage) message;  
                    try {  
                        System.out.println("002 Received message: " + tm.getText());  
                    } catch (JMSException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }
    
}


















推荐阅读