首页 > 技术文章 > activemq学习笔记2

lanqie 2018-04-19 16:47 原文

基本步骤:

            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination que = new ActiveMQQueue("que");
            MessageProducer producer = session.createProducer(que);
            TextMessage msg = session.createTextMessage("hello activemq");
            producer.send(msg);
            //session.commit();
            session.close();
            connection.close();

注意的地方

1、连接开启
connection.start();
2.1、会话类型(事务型,非事务型)
connection.createSession(false, Session.AUTO_ACKNOWLEDGE)的参数一
2.2、应答模式
connection.createSession(false, Session.AUTO_ACKNOWLEDGE)的参数二

    int AUTO_ACKNOWLEDGE = 1;
    int CLIENT_ACKNOWLEDGE = 2;
    int DUPS_OK_ACKNOWLEDGE = 3;
    int SESSION_TRANSACTED = 0;

2.3、组合方式
false:
int AUTO_ACKNOWLEDGE = 1;
int CLIENT_ACKNOWLEDGE = 2;
int DUPS_OK_ACKNOWLEDGE = 3;
true:
int SESSION_TRANSACTED = 0;

事务型需要session.commit()
非事务型不能session.commit()

持久化

默认是持久化的,消息保存在磁盘,所以即使mq挂了,消息不会丢失

        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>

配置持久化:

producer.setDeliveryMode(DeliveryMode.PERSISTENT);
public interface DeliveryMode {
    int NON_PERSISTENT = 1;
    int PERSISTENT = 2;
}

异步

发送消息

            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            factory.setUseAsyncSend(true);

非持久化消息模式下,默认就是异步发送过程,如果需要对非持久化消息的每次发送的消息都获得broker的回执的话

connectionFactory.setAlwaysSyncSend()

接受消息

public class MsgReceiver {
    public static void main(String[] args) {

        try {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination que = new ActiveMQQueue("que");
            MessageConsumer consumer = session.createConsumer(que);
            consumer.setMessageListener(new MsgListener());
            //session.commit();
            //session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

class MsgListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        TextMessage msg = (TextMessage) message;
        try {
            System.out.println(msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

异步接收不能提前关闭session,非事务不能commit()

推荐阅读