首页 > 技术文章 > 消息队列——RocketMQ

vettel0329 2019-05-10 17:44 原文

1.安装及启动

  a.在官网 rocketmq.apache.org/release_notes/release-notes-4.4.0/ 下载并解压RocketMQ

  b.配置环境变量:

    变量名:ROCKETMQ_HOME

    变量值:E:\java\rocketmq-4.4.0  (RocketMQ路径)

  c.启动NAMESERVER:

    在MQ的 bin 目录下执行:start mqnamesrv.cmd

  d.启动BROKER:

    在MQ的 bin 目录下执行:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

  e.注意:若提示 "错误: 找不到或无法加载主类" ,请不要将RocketMQ解压到含空格的文件夹中(如:"Program Files"文件夹),或者打开 runbroker.cmd 将‘%CLASSPATH%’加上英文双引号

 

 

2.部署RocketMQ管理插件

  a.在 https://github.com/apache/rocketmq-externals.git 下载 rocketmq-console 工程

  b.修改 rocketmq-console\src\main\resources 中的 application.properties 配置

#端口号
server.port=8081

.....

#MQ服务的IP地址和端口号
rocketmq.config.namesrvAddr=127.0.0.1:9876

.....

  c.在“rocketmq-console”目录下,执行‘mvn clean package -Dmaven.test.skip=true’,用maven编译打包

  d.进入‘target’文件夹,执行‘java -jar rocketmq-console-ng-1.0.1.jar’,启动‘rocketmq-console-ng-1.0.1.jar’

  e.浏览器中输入‘127.0.0.1:8081’,成功后即可查看

 

3.使用

  a.导入依赖

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.4.0</version>
    </dependency>

 

 

  b.简单消息发送

    消费者

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        //根据GroupName初始化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("uniqueGroupName");
        //设置MQ服务地址
        consumer.setNamesrvAddr("localhost:9876");
        //订阅Topic
        consumer.subscribe("TopicTest", "*");
        //注册消息监听回调
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("[" + Thread.currentThread().getName() + "]" + msgs);
                for(MessageExt msg : msgs){
                    System.out.println("[" + Thread.currentThread().getName() + "]接收到数据: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
        System.out.println("消费者启动");
    }
}

 

    同步发送生产者

public class SyncProducer {

    public static void main(String[] args) throws Exception {
        //根据GroupName初始化生产者
        DefaultMQProducer producer = new DefaultMQProducer("uniqueGroupName");
        //设置MQ服务地址
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();
        for (int i = 0; i < 100; i++) {
            //创建消息实例,定义Topic,Tag和消息内容
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息给Brokers
            SendResult sendResult = producer.send(msg);
            System.out.println("[" + i + "]" + sendResult);
        }
        //不再使用,则关闭生产者
        producer.shutdown();
    }
}

 

    异步发送生产者

public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        //根据GroupName初始化生产者
        DefaultMQProducer producer = new DefaultMQProducer("uniqueGroupName");
        //设置MQ服务地址
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);   //失败重试次数
        for (int i = 0; i < 100; i++) {
            final int index = i;
            //创建消息实例,定义Topic,Tag和消息内容
            Message msg = new Message("TopicTest", "TagA", "OrderID" + index, ("Hello world " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("[" + index + "]成功:" + sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    System.out.println("[" + index + "]异常:" + e);
                    e.printStackTrace();
                }
            });
        }
        //休眠10秒
        Thread.sleep(10000L);
        //不再使用,则关闭生产者
        producer.shutdown();
    }

}

 

    单向发送生产者

public class OnewayProducer {

    public static void main(String[] args) throws Exception{
        //根据GroupName初始化生产者
        DefaultMQProducer producer = new DefaultMQProducer("uniqueGroupName");
        //设置MQ服务地址
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();
        for (int i = 0; i < 100; i++) {
            //创建消息实例,定义Topic,Tag和消息内容
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息给一个Broker
            producer.sendOneway(msg);
        }
        //不再使用,则关闭生产者
        producer.shutdown();
    }

}

 

 

  c.顺序消息发送

    顺序消息消费者

public class OrderedConsumer {

    public static void main(String[] args) throws Exception {
        //根据GroupName初始化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("exampleGroupName");
        //设置MQ服务地址
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);   //设置从队列头开始消费
        consumer.subscribe("TopicTestjjj", "TagA || TagB || TagC || TagD || TagE");    //订阅Topic
        //注册消息监听回调
        consumer.registerMessageListener(new MessageListenerOrderly() {
//            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.println("[" + Thread.currentThread().getName() + "]" + msgs);
                for(MessageExt msg : msgs){
                    System.out.println("[" + Thread.currentThread().getName() + "]接收到数据: " + new String(msg.getBody()));
                }
//                this.consumeTimes.incrementAndGet();
//                if ((this.consumeTimes.get() % 2) == 0) {
//                    return ConsumeOrderlyStatus.SUCCESS;
//                } else if ((this.consumeTimes.get() % 3) == 0) {
//                    return ConsumeOrderlyStatus.ROLLBACK;
//                } else if ((this.consumeTimes.get() % 4) == 0) {
//                    return ConsumeOrderlyStatus.COMMIT;
//                } else if ((this.consumeTimes.get() % 5) == 0) {
//                    context.setSuspendCurrentQueueTimeMillis(3000);
//                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
//                }
                return ConsumeOrderlyStatus.SUCCESS;

            }
        });
        //启动消费者
        consumer.start();
        System.out.println("消费者启动");
    }

}

 

    顺序消息生产者

public class OrderedProducer {

    public static void main(String[] args) throws Exception {
        //根据GroupName初始化生产者
        DefaultMQProducer producer = new DefaultMQProducer("exampleGroupName");
        //设置MQ服务地址
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 50; i++) {
            //顺序队列的ID
            int orderId = i % 2;
            //创建消息实例,定义Topic,Tag和消息内容
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.println("[" + i + "]" + sendResult);
        }
        //不再使用,则关闭生产者
        producer.shutdown();
    }

}

    注:这里分为 orderId 分别为0和1的队列,分别保证了奇数和偶数的顺序

 

 

  d.广播

    广播生产者(同普通生产者)

public class BroadcastProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest", "TagA", "OrderID" + i, ("Hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.println("[" + i + "]" + sendResult);
        }
        producer.shutdown();
    }
}

 

    广播消费者

public class BroadcastConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("exampleGroupName");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //设置广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("[" + Thread.currentThread().getName() + "]:" + msgs);
                for(MessageExt msg : msgs){
                    System.out.println("[" + Thread.currentThread().getName() + "]接收到数据: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消费者启动");
    }
}

 

 

  e.过滤

    ------------
    | message  |
    |----------|  a > 5 AND b = 'abc'
    | a = 10   |  --------------------> Gotten
    | b = 'abc'|
    | c = true |
    ------------
    ------------
    | message  |
    |----------|   a > 5 AND b = 'abc'
    | a = 1    |  --------------------> Missed
    | b = 'abc'|
    | c = true |
    ------------

    ①数字比较:>>=<<=BETWEEN=

    ②字符串比较:=<>IN

    ③IS NULL, IS NOT NULL

    ④ANDORNOT

 

    注:要是用sql92,需要修改MQ的 conf 目录下的 broker.conf

enablePropertyFilter = true

    再回到bin目录重启broker

start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf

 

    生产者

public class FilterProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("uniqueGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //设置UserPorperty
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.println("[" + i + "]" + sendResult);
        }
        producer.shutdown();
    }

}

 

    消费者

public class FilterConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("uniqueGroupName");
        consumer.setNamesrvAddr("localhost:9876");
        //只订阅property中a在0-3之间的消息
        consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("[" + Thread.currentThread().getName() + "]" + msgs);
                for(MessageExt msg : msgs){
                    System.out.println("[" + Thread.currentThread().getName() + "]接收到数据: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消费者启动");
    }

}

 

    

   f.事务(基于二阶段分布式事务)

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("uniqueGroupName");
        producer.setNamesrvAddr("localhost:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, i);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

}
public class TransactionListenerImpl implements TransactionListener {

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();


    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = (int) arg;
        localTrans.put(msg.getTransactionId(), value);
        switch (value) {
            case 0:
            case 1:
            case 2:
            case 3:
            case 4:
            case 5:
                //TODO 本地方法执行成功,需要进行后续操作(譬如下游项目需要执行其他业务操作)
                System.out.println("[execute" + value + "] UNKNOW, transactionId[" + msg.getTransactionId() + "], msg[" + new String(msg.getBody()) + "]");
                return LocalTransactionState.UNKNOW;
            case 6:
            case 7:
                //TODO 本地方法执行失败,不发送消息
                System.out.println("[execute" + value + "] ROLLBACK_MESSAGE, transactionId[" + msg.getTransactionId() + "], msg[" + new String(msg.getBody()) + "]");
                return LocalTransactionState.ROLLBACK_MESSAGE;
            default:
                //TODO 本地方法执行成功,发送消息
                System.out.println("[execute" + value + "] COMMIT_MESSAGE, transactionId[" + msg.getTransactionId() + "], msg[" + new String(msg.getBody()) + "]");
                return LocalTransactionState.COMMIT_MESSAGE;
        }
    }


    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer value = localTrans.get(msg.getTransactionId());
        if (null != value) {
            switch (value) {
                case 0:
                case 1:
                    //TODO 无法确认后续操作是否执行成功,继续进行确认
                    System.out.println("[check" + value + "] UNKNOW, transactionId[" + msg.getTransactionId() + "], msg[" + new String(msg.getBody()) + "]");
                    return LocalTransactionState.UNKNOW;
                case 2:
                case 3:
                    //TODO 查询到后续操作执行失败,不发送消息
                    System.out.println("[check" + value + "] ROLLBACK_MESSAGE, transactionId[" + msg.getTransactionId() + "], msg[" + new String(msg.getBody()) + "]");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    //TODO 查询到后续操作执行成功,发送消息
                    System.out.println("[check" + value + "] COMMIT_MESSAGE, transactionId[" + msg.getTransactionId() + "], msg[" + new String(msg.getBody()) + "]");
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }

        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

 

 

  参考文档:

    https://www.jianshu.com/p/4a275e779afa

    https://www.jianshu.com/p/d8a21ab2c2d3

    http://rocketmq.apache.org/docs/simple-example/

推荐阅读