首页 > 技术文章 > Apache ActiveMQ 实践 <一>

xwy6 2018-01-19 16:45 原文

一.下载最新版本 ActiveMq

       http://activemq.apache.org/activemq-5152-release.html,下载目录如下:

二.创建项目

1.普通项目

   添加 jar包   

2.Maven项目

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>

三.代码分析,生产者,消费者    //点对点模式

/**
* 消息生产者
*/
public class JMSProducer {

private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; //链接的用户
   private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;  //链接的密码
   private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;  //链接的地址,消息总线
   private static final int SENDNUM=10;//发送消息的数目

public static void main(String[] args) {

ConnectionFactory connectionFactory; //链接工厂
Connection connection = null; //链接
Session session; //会话
Destination destination; //目的地
MessageProducer messageProducer; //消息生产者

//实例化链接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

try {
connection=connectionFactory.createConnection(); //创建链接
connection.start(); //启动链接
session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//创建会话,设置确认方式
destination=session.createQueue("FirstQueue1");//创建消息队列
messageProducer=session.createProducer(destination); //创建目的地
sendMessage(session, messageProducer); //发送消息
session.commit(); //有事物,需要提交session
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

/**
* 发送消息
*/
public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
for(int i=0;i<JMSProducer.SENDNUM;i++){
TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
System.out.println("发送消息"+"ActiveMQ 发送的消息"+i);
messageProducer.send(message);
}
}
}

 

//下面是消费者代码,大致相同,我将不同的部分添加了注释,使用监听器的方式,进行消费。

 

/**
*消息消费者
*/
public class JMSConsumer2 {

private static final String USERNAME=ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer messageConsumer;

connectionFactory=new ActiveMQConnectionFactory
(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);

try {
connection=connectionFactory.createConnection();
connection.start();
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination=session.createQueue("FirstQueue1"); //创建消息队列
messageConsumer=session.createConsumer(destination);//创建消费者
messageConsumer.setMessageListener(new Listener());//注册监听
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

 

//监听器类如下,监听器必须实现MessageListener接口

/**
*监听器
*
*/
public class Listener implements MessageListener{

@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("监听器:"+((TextMessage)message).getText());   //将消息类型将转为TextMessage,这只是消息类型的一种。
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

使用Apache提供的控制台进行测试。

在bin目录下有选择对应的版本进入,启动 activemq.bat ,如下则启动成功。(如果一闪而过,可能是java环境变量没配置)


 

浏览器访问:http://127.0.0.1:8161/admin/queues.jsp

 

 

//运行生产者代码,生产消息

//运行消费者代码消费消息

 

综合的消息队列中消息的总数,待消费的数量,消费者数量等数据都可以在控制台中观察到。

 

推荐阅读