首页 > 技术文章 > 转载-ActiveMQ的简单实例 - 生产者消费者模式

reload-sun 2017-12-12 20:07 原文

转载:http://blog.csdn.net/zgljl2012/article/details/52998446

安装

首先,需要去官网下载windows版本(如果使用的是Linux,就下载对应Linux的)的ActiveMQ并安装,下载地址

下载完后解压缩,进入bin目录,打开一个控制台,输入:

activemq.bat start

 

就可以启动ActiveMQ了。


Github上项目地址:https://github.com/zgljl2012/activemq-learn

 

实现

在Eclipse中创建一个Maven项目(也可以是普通Java项目),名称为activemq-learn(可随意取)。在pom.xml文件中添加ActiveMQ的依赖包:

<dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.14.1</version>
    </dependency>

如果不想要下载Maven的话,可以直接下载Jar包。

下面先贴出测试代码:

public class Client {

    public static void main(String[] args) throws InterruptedException {
        String username = "admin";
        String password = "admin";
        String url = ActiveMQConnection.DEFAULT_BROKER_URL;
        // 创建生产者
        Producer producer;
        try {
            producer = new Producer(url, username, password);
            // 生产者产生一条消息
            producer.sendMessage("Hello World");
            // producer.close();
            // 创建消费者
            Consumer consumer = new Consumer(url, username, password);
            // 消费者读取一条消息
            Object msg = consumer.receive();

            // 输出消息内容
            System.out.println(msg);

            producer.close();
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }   
    }
}

过程如下: 创建生产者 -> 生产者产生消息 -> 发送给消息队列 创建消费者 -> 监听消息队列 -> 获得消息

生产者,Producer代码:

/**
 * 生产者
 * @author zgljl2012
 */
public class Producer implements Cloneable{

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 连接地址,host:port,如localhost:8161
     */
    private String url;

    /**
     * 队列名
     */
    private static final String QUEUE_NAME = "HELLO";

    //连接工厂
    private ConnectionFactory connectionFactory;

    //会话 接受或者发送消息的线程
    private Session session;

    //消息的目的地
    private Destination destination;

    //消息生产者
    private MessageProducer messageProducer;

    //与MQ的连接
    private Connection connection = null;

    public Producer(String url, String username, String password) throws JMSException {
        this.url = url;
        this.username = username;
        this.password = password;
        this.init();
    }

    /**
     * 初始化方法
     * @throws JMSException 
     */
    private void init() throws JMSException {
        // 实例化工厂
        connectionFactory = new ActiveMQConnectionFactory(
                username, password, url);
        // 获取连接
        connection = connectionFactory.createConnection();
        // 启动
        connection.start();
        // 获取session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建指定队列名的队列
        destination = session.createQueue(Producer.QUEUE_NAME);
        // 创建消息生产者
        messageProducer = session.createProducer(destination);
    }

    /**
     * 发送消息
     * @param data 数据
     * @throws JMSException 
     */
    public void sendMessage(String data) throws JMSException {
        // 创建一个Message
        TextMessage msg = session.createTextMessage(data);
        // 通过消息发送者进行消息发送
        this.messageProducer.send(msg);
    }

    public void close() throws JMSException {
        session.close();
        connection.close();
    }
}

消费者,Cursumer.java

/**
 * 消费者
 * @author zgljl2012
 *
 */
public class Consumer {

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 连接地址,host:port,如localhost:8161
     */
    private String url;

    /**
     * 队列名
     */
    private static final String QUEUE_NAME = "HELLO";

    //连接工厂
    private ConnectionFactory connectionFactory;

    //会话 接受或者发送消息的线程
    private Session session;

    //消息的目的地
    private Destination destination;

    //消息消费者
    private MessageConsumer messageConsumer;

    //与MQ的连接
    private Connection connection = null;

    public Consumer(String url, String username, String password) throws JMSException {
        this.url = url;
        this.username = username;
        this.password = password;
        this.init();
    }

    /**
     * 初始化方法
     * @throws JMSException 
     */
    private void init() throws JMSException {
        // 实例化工厂
        connectionFactory = new ActiveMQConnectionFactory(
                username, password, url);
        // 获取连接
        connection = connectionFactory.createConnection();
        // 启动
        connection.start();
        // 获取session
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 创建指定队列名的队列
        destination = session.createQueue(Consumer.QUEUE_NAME);
        // 创建消息消费者
        messageConsumer = session.createConsumer(destination);
    }

    /**
     * 接收消息
     * @throws JMSException 
     */
    public String receive() throws JMSException {
        TextMessage msg = (TextMessage) messageConsumer.receive(5000);
        if(msg == null) {
            return null;
        }
        return msg.getText();
    }

    public void close() throws JMSException {
        session.close();
        connection.close();
    }

}

生产者

创建生产者过程分析:

// 实例化工厂
        connectionFactory = new ActiveMQConnectionFactory(
                username, password, url);
        // 获取连接
        connection = connectionFactory.createConnection();
        // 启动
        connection.start();
        // 获取session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建指定队列名的队列
        destination = session.createQueue(Producer.QUEUE_NAME);
        // 创建消息生产者
        messageProducer = session.createProducer(destination);

ActiveMQ符合JMS规范,里面的接口都是jms接口。

1. 实例化工厂,指定使用ActiveMQ的实现类,传入用户名、密码和连接地址;

2. 使用工厂创建一个连接,类似数据库连接;

3. 启动连接;

4. 创建一个session,第一个参数的决定是否采用事务方式(如果采用事务方式,在发送完消息后需要关闭session,否则消息不能发送出去),第二个参数觉得应答模式,我们使用生产一次,消费一次,自动应答这种模式;

5. 使用session创建队列(主题-发布模式为topic);

6. 使用session创建消息生产者。

发送消息:

// 创建一个Message
TextMessage msg = session.createTextMessage(data);
// 通过消息发送者进行消息发送
this.messageProducer.send(msg);

通过session可以创建不同类型的消息,除了TextMessage,还可以创建MapMessage和普通的Message,这样就支持发送能序列化的Bean了。

消费者

消费者创建过程类似生产者,只是最终是生成MessageConsumer。

接收消息:

TextMessage msg = (TextMessage) messageConsumer.receive(5000);
if(msg == null) {
    return null;
}
return msg.getText();

从消息队列中接收文本消息,receive(5000)中的5000是超时时间,超过了5s就结束不再接收消息。如果不指定超时时间,将一直等下去,直到消息队列有消息为止。

项目地址:https://github.com/zgljl2012/activemq-learn

推荐阅读