首页 > 技术文章 > RocketMQ(4.8.0)——生产者

zuoyang 2021-02-07 16:30 原文

RocketMQ(4.8.0)——生产者

一、生产者概述

  发送消息的一方被称为生产者,它在整个RocketMQ的生产者和消费体中扮演角色如下:

基本概念:

生产者组: 一个逻辑概念,在使用生产者实例的时候需要制定一个组名。一个生产者组可以生产多个Topic的消息。

生产者实例:一个生产者组部署了多个进程,每个进程都可以称为一个生产者实例。

Topic:主题名字,一个Topic由若干Queue组成。

  RocketMQ 客户端中的生产者有两个独立实现类:org.apache.rocketmq.client.producer.defaultmqproducer(普通消息、顺序消息、单向消息、批量消息、延迟消息)org.apache.rocketmq.client.producer.transactionmqproducer(事务消息)

二、消息结构和消息类型

rocketmq-master/common/src/main/java/org/apache/rocketmq/common/message/Message.java (rocketmq-all-4.8.0)

  1 /*
  2  * Licensed to the Apache Software Foundation (ASF) under one or more
  3  * contributor license agreements.  See the NOTICE file distributed with
  4  * this work for additional information regarding copyright ownership.
  5  * The ASF licenses this file to You under the Apache License, Version 2.0
  6  * (the "License"); you may not use this file except in compliance with
  7  * the License.  You may obtain a copy of the License at
  8  *
  9  *     http://www.apache.org/licenses/LICENSE-2.0
 10  *
 11  * Unless required by applicable law or agreed to in writing, software
 12  * distributed under the License is distributed on an "AS IS" BASIS,
 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  * See the License for the specific language governing permissions and
 15  * limitations under the License.
 16  */
 17 package org.apache.rocketmq.common.message;
 18 
 19 import java.io.Serializable;
 20 import java.util.Arrays;
 21 import java.util.Collection;
 22 import java.util.HashMap;
 23 import java.util.Map;
 24 
 25 public class Message implements Serializable {
 26     private static final long serialVersionUID = 8445773977080406428L;
 27 
 28     private String topic;
 29     private int flag;
 30     private Map<String, String> properties;
 31     private byte[] body;
 32     private String transactionId;
 33 
 34     public Message() {
 35     }
 36 
 37     public Message(String topic, byte[] body) {
 38         this(topic, "", "", 0, body, true);
 39     }
 40 
 41     public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
 42         this.topic = topic;
 43         this.flag = flag;
 44         this.body = body;
 45 
 46         if (tags != null && tags.length() > 0)
 47             this.setTags(tags);
 48 
 49         if (keys != null && keys.length() > 0)
 50             this.setKeys(keys);
 51 
 52         this.setWaitStoreMsgOK(waitStoreMsgOK);
 53     }
 54 
 55     public Message(String topic, String tags, byte[] body) {
 56         this(topic, tags, "", 0, body, true);
 57     }
 58 
 59     public Message(String topic, String tags, String keys, byte[] body) {
 60         this(topic, tags, keys, 0, body, true);
 61     }
 62 
 63     public void setKeys(String keys) {
 64         this.putProperty(MessageConst.PROPERTY_KEYS, keys);
 65     }
 66 
 67     void putProperty(final String name, final String value) {
 68         if (null == this.properties) {
 69             this.properties = new HashMap<String, String>();
 70         }
 71 
 72         this.properties.put(name, value);
 73     }
 74 
 75     void clearProperty(final String name) {
 76         if (null != this.properties) {
 77             this.properties.remove(name);
 78         }
 79     }
 80 
 81     public void putUserProperty(final String name, final String value) {
 82         if (MessageConst.STRING_HASH_SET.contains(name)) {
 83             throw new RuntimeException(String.format(
 84                 "The Property<%s> is used by system, input another please", name));
 85         }
 86 
 87         if (value == null || value.trim().isEmpty()
 88             || name == null || name.trim().isEmpty()) {
 89             throw new IllegalArgumentException(
 90                 "The name or value of property can not be null or blank string!"
 91             );
 92         }
 93 
 94         this.putProperty(name, value);
 95     }
 96 
 97     public String getUserProperty(final String name) {
 98         return this.getProperty(name);
 99     }
100 
101     public String getProperty(final String name) {
102         if (null == this.properties) {
103             this.properties = new HashMap<String, String>();
104         }
105 
106         return this.properties.get(name);
107     }
108 
109     public String getTopic() {
110         return topic;
111     }
112 
113     public void setTopic(String topic) {
114         this.topic = topic;
115     }
116 
117     public String getTags() {
118         return this.getProperty(MessageConst.PROPERTY_TAGS);
119     }
120 
121     public void setTags(String tags) {
122         this.putProperty(MessageConst.PROPERTY_TAGS, tags);
123     }
124 
125     public String getKeys() {
126         return this.getProperty(MessageConst.PROPERTY_KEYS);
127     }
128 
129     public void setKeys(Collection<String> keys) {
130         StringBuffer sb = new StringBuffer();
131         for (String k : keys) {
132             sb.append(k);
133             sb.append(MessageConst.KEY_SEPARATOR);
134         }
135 
136         this.setKeys(sb.toString().trim());
137     }
138 
139     public int getDelayTimeLevel() {
140         String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
141         if (t != null) {
142             return Integer.parseInt(t);
143         }
144 
145         return 0;
146     }
147 
148     public void setDelayTimeLevel(int level) {
149         this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
150     }
151 
152     public boolean isWaitStoreMsgOK() {
153         String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
154         if (null == result)
155             return true;
156 
157         return Boolean.parseBoolean(result);
158     }
159 
160     public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {
161         this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
162     }
163 
164     public void setInstanceId(String instanceId) {
165         this.putProperty(MessageConst.PROPERTY_INSTANCE_ID, instanceId);
166     }
167 
168     public int getFlag() {
169         return flag;
170     }
171 
172     public void setFlag(int flag) {
173         this.flag = flag;
174     }
175 
176     public byte[] getBody() {
177         return body;
178     }
179 
180     public void setBody(byte[] body) {
181         this.body = body;
182     }
183 
184     public Map<String, String> getProperties() {
185         return properties;
186     }
187 
188     void setProperties(Map<String, String> properties) {
189         this.properties = properties;
190     }
191 
192     public String getBuyerId() {
193         return getProperty(MessageConst.PROPERTY_BUYER_ID);
194     }
195 
196     public void setBuyerId(String buyerId) {
197         putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId);
198     }
199 
200     public String getTransactionId() {
201         return transactionId;
202     }
203 
204     public void setTransactionId(String transactionId) {
205         this.transactionId = transactionId;
206     }
207 
208     @Override
209     public String toString() {
210         return "Message{" +
211             "topic='" + topic + '\'' +
212             ", flag=" + flag +
213             ", properties=" + properties +
214             ", body=" + Arrays.toString(body) +
215             ", transactionId='" + transactionId + '\'' +
216             '}';
217     }
218 }
View Code

topic:主题名字,可以通过 RocketMQ Console 创建。

flag:目前没用。

properties:消息扩展信息,Tag、keys、延迟级别都保存在这里。

body:消息体,字节数组。需要主席生产者使用什么编码,消费者也必须使用相同的编码解码,否则会产生乱码。

setKeys():设置消息的key,多个key可以用MessageConst.KEY_SEPARATOR (空格)分隔或者直接用另一个重载方法。如果Broker中messageIndexEnable=true则会根据key创建消息的Hash索引,帮助用户进行快速查询。

1     public void setKeys(Collection<String> keys) {
2         StringBuffer sb = new StringBuffer();
3         for (String k : keys) {
4             sb.append(k);
5             sb.append(MessageConst.KEY_SEPARATOR);
6         }
7 
8         this.setKeys(sb.toString().trim());
9     }
View Code

settags():消息过滤的标记,用户可以订阅某个Topic的某些tag,这样broker只会把订阅了topic-tag的消息发送给消费者。

setDelayTimeLevel():设置延迟级别,延迟多久消费者可以消费。

putUserProperty():如果还有其他扩展信息,可以存放在这里。内部是一个Map,重复调用会覆盖旧值。

RocketMQ支持普通消息、分区有序消息、全局有序消息、延迟消息和事务消息。

普通消息:没有特殊功能的消息。普通消息也称为并发消息,和传统的队列相比,并发消息没有顺序,但是生产消息都是并发进行的,单机性能可达到十万级别的TPS。

分区有序消息:与kafka中的分区类似,把一个Topic消息分为多个分区 "保存" 和消费,在一个分区内的消息就是传统的队列,遵循FIFO(先进先出)原则。

全局有序消息:如果把一个Topic的分区数设置为1,那么该Topic中的消息就是单分区,所有的消息都遵循FIFO(先进先出)的原则。

延迟消息:消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消费。在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在RocketMQ中只需要在发送消息时设置延迟级别即可实现。

事务消息:主要涉及分布式事务,即需要保证在多个操作同时成功或者同时失败时,消费者才能消费消息。RocketMQ通过发送Half消息、处理本地事务、提交(Commit)消费或者回滚(Rollback)消息优雅地实现分布式事务。

三、生产者高可用

  通常,我们希望不管Broker、Namesrv出现什么情况,发送消息都不要出现未知状态或者消息丢失。在消息发送的过程中,客户端、Broker、Namesrv都有可能发生服务器损坏、掉电等各种故障。当这些故障发生时,RocketMQ是怎么处理的呢?

  1、客户端保证

  第一种保证机制:重试机制。RocketMQ支持同步、异步发送,不管哪种方式都可以在配置失败后重试,如果单个Broker发生故障,重试会选择其他Broker保证消息正常发送。

  配置项retryTimesWhenSendFailed: 同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。

同步发送的重试代码可以参考,D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\producer\DefaultMQProducerImpl.java (rocketmq-all-4.8.0

  1     @org.jetbrains.annotations.Nullable
  2     private SendResult sendDefaultImpl(
  3         Message msg,
  4         final CommunicationMode communicationMode,
  5         final SendCallback sendCallback,
  6         final long timeout
  7     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  8         this.makeSureStateOK();
  9         Validators.checkMessage(msg, this.defaultMQProducer);
 10         final long invokeID = random.nextLong();
 11         long beginTimestampFirst = System.currentTimeMillis();
 12         long beginTimestampPrev = beginTimestampFirst;
 13         long endTimestamp = beginTimestampFirst;
 14         TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 15         if (topicPublishInfo != null && topicPublishInfo.ok()) {
 16             boolean callTimeout = false;
 17             MessageQueue mq = null;
 18             Exception exception = null;
 19             SendResult sendResult = null;
 20             int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
 21             int times = 0;
 22             String[] brokersSent = new String[timesTotal];
 23             for (; times < timesTotal; times++) {
 24                 String lastBrokerName = null == mq ? null : mq.getBrokerName();
 25                 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
 26                 if (mqSelected != null) {
 27                     mq = mqSelected;
 28                     brokersSent[times] = mq.getBrokerName();
 29                     try {
 30                         beginTimestampPrev = System.currentTimeMillis();
 31                         if (times > 0) {
 32                             //Reset topic with namespace during resend.
 33                             msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
 34                         }
 35                         long costTime = beginTimestampPrev - beginTimestampFirst;
 36                         if (timeout < costTime) {
 37                             callTimeout = true;
 38                             break;
 39                         }
 40 
 41                         sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
 42                         endTimestamp = System.currentTimeMillis();
 43                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
 44                         switch (communicationMode) {
 45                             case ASYNC:
 46                                 return null;
 47                             case ONEWAY:
 48                                 return
 49                                         ..........null;
 50                             case SYNC:
 51                                 if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
 52                                     if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
 53                                         continue;
 54                                     }
 55                                 }
 56 
 57                                 return sendResult;
 58                             default:
 59                                 break;
 60                         }
 61                     } catch (RemotingException e) {
 62                         endTimestamp = System.currentTimeMillis();
 63                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 64                         log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 65                         log.warn(msg.toString());
 66                         exception = e;
 67                         continue;
 68                     } catch (MQClientException e) {
 69                         endTimestamp = System.currentTimeMillis();
 70                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 71                         log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 72                         log.warn(msg.toString());
 73                         exception = e;
 74                         continue;
 75                     } catch (MQBrokerException e) {
 76                         endTimestamp = System.currentTimeMillis();
 77                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 78                         log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 79                         log.warn(msg.toString());
 80                         exception = e;
 81                         switch (e.getResponseCode()) {
 82                             case ResponseCode.TOPIC_NOT_EXIST:
 83                             case ResponseCode.SERVICE_NOT_AVAILABLE:
 84                             case ResponseCode.SYSTEM_ERROR:
 85                             case ResponseCode.NO_PERMISSION:
 86                             case ResponseCode.NO_BUYER_ID:
 87                             case ResponseCode.NOT_IN_CURRENT_UNIT:
 88                                 continue;
 89                             default:
 90                                 if (sendResult != null) {
 91                                     return sendResult;
 92                                 }
 93 
 94                                 throw e;
 95                         }
 96                     } catch (InterruptedException e) {
 97                         endTimestamp = System.currentTimeMillis();
 98                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
 99                         log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
100                         log.warn(msg.toString());
101 
102                         log.warn("sendKernelImpl exception", e);
103                         log.warn(msg.toString());
104                         throw e;
105                     }
106                 } else {
107                     break;
108                 }
109             }
110 
111             if (sendResult != null) {
112                 return sendResult;
113             }
114 
115             String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
116                 times,
117                 System.currentTimeMillis() - beginTimestampFirst,
118                 msg.getTopic(),
119                 Arrays.toString(brokersSent));
120 
121             info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
122 
123             MQClientException mqClientException = new MQClientException(info, exception);
124             if (callTimeout) {
125                 throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
126             }
127 
128             if (exception instanceof MQBrokerException) {
129                 mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
130             } else if (exception instanceof RemotingConnectException) {
131                 mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
132             } else if (exception instanceof RemotingTimeoutException) {
133                 mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
134             } else if (exception instanceof MQClientException) {
135                 mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
136             }
137 
138             throw mqClientException;
139         }
View Code

异步发送重试代码可以参考,D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\MQClientAPIImpl.java (rocketmq-all-4.8.0)

 1     private void sendMessageAsync(
 2         final String addr,
 3         final String brokerName,
 4         final Message msg,
 5         final long timeoutMillis,
 6         final RemotingCommand request,
 7         final SendCallback sendCallback,
 8         final TopicPublishInfo topicPublishInfo,
 9         final MQClientInstance instance,
10         final int retryTimesWhenSendFailed,
11         final AtomicInteger times,
12         final SendMessageContext context,
13         final DefaultMQProducerImpl producer
14     ) throws InterruptedException, RemotingException {
15         final long beginStartTime = System.currentTimeMillis();
16         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
17             @Override
18             public void operationComplete(ResponseFuture responseFuture) {
19                 long cost = System.currentTimeMillis() - beginStartTime;
20                 RemotingCommand response = responseFuture.getResponseCommand();
21                 if (null == sendCallback && response != null) {
22 
23                     try {
24                         SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
25                         if (context != null && sendResult != null) {
26                             context.setSendResult(sendResult);
27                             context.getProducer().executeSendMessageHookAfter(context);
28                         }
29                     } catch (Throwable e) {
30                     }
31 
32                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
33                     return;
34                 }
35 
36                 if (response != null) {
37                     try {
38                         SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
39                         assert sendResult != null;
40                         if (context != null) {
41                             context.setSendResult(sendResult);
42                             context.getProducer().executeSendMessageHookAfter(context);
43                         }
44 
45                         try {
46                             sendCallback.onSuccess(sendResult);
47                         } catch (Throwable e) {
48                         }
49 
50                         producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
51                     } catch (Exception e) {
52                         producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
53                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
54                             retryTimesWhenSendFailed, times, e, context, false, producer);
55                     }
56                 } else {
57                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
58                     if (!responseFuture.isSendRequestOK()) {
59                         MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
60                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
61                             retryTimesWhenSendFailed, times, ex, context, true, producer);
62                     } else if (responseFuture.isTimeout()) {
63                         MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
64                             responseFuture.getCause());
65                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
66                             retryTimesWhenSendFailed, times, ex, context, true, producer);
67                     } else {
68                         MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
69                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
70                             retryTimesWhenSendFailed, times, ex, context, true, producer);
71                     }
72                 }
73             }
74         });
75     }
View Code

  通信是在通信层异步发送发完成的,当operationComplete()方法返回的response值为null时,会重新执行重试代码。返回值response为null通常是因为客户端收到TCP请求解包失败,或者没有找到匹配的request。

  生产者配置项RetryTimesWhenSendAsyncFailed表示异步重试的次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢,默认为2次,加上正常发送的1次,总共有3次发送机会。

  第二种保证机制:客户端容错。RocketMQ Client会维护一个 "Broker-发送延迟" 关系,根据这个关系选择一个发送延迟级别较低的 Broker 来发送消息,这样能最大限度地利用 Broker 的能力,剔除已经宕机、不可用或者发送延迟级别较高的 Broker,尽量保证消息的正常发送。

  这种机制主要体现在发送消息时如何选择Queue,源代码在 D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\latency\MQFaultStrategy.java  (rocketmq-all-4.8.0

  1 /*
  2  * Licensed to the Apache Software Foundation (ASF) under one or more
  3  * contributor license agreements.  See the NOTICE file distributed with
  4  * this work for additional information regarding copyright ownership.
  5  * The ASF licenses this file to You under the Apache License, Version 2.0
  6  * (the "License"); you may not use this file except in compliance with
  7  * the License.  You may obtain a copy of the License at
  8  *
  9  *     http://www.apache.org/licenses/LICENSE-2.0
 10  *
 11  * Unless required by applicable law or agreed to in writing, software
 12  * distributed under the License is distributed on an "AS IS" BASIS,
 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  * See the License for the specific language governing permissions and
 15  * limitations under the License.
 16  */
 17 
 18 package org.apache.rocketmq.client.latency;
 19 
 20 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 21 import org.apache.rocketmq.client.log.ClientLogger;
 22 import org.apache.rocketmq.logging.InternalLogger;
 23 import org.apache.rocketmq.common.message.MessageQueue;
 24 
 25 public class MQFaultStrategy {
 26     private final static InternalLogger log = ClientLogger.getLog();
 27     private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
 28 
 29     private boolean sendLatencyFaultEnable = false;
 30 
 31     private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
 32     private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
 33 
 34     public long[] getNotAvailableDuration() {
 35         return notAvailableDuration;
 36     }
 37 
 38     public void setNotAvailableDuration(final long[] notAvailableDuration) {
 39         this.notAvailableDuration = notAvailableDuration;
 40     }
 41 
 42     public long[] getLatencyMax() {
 43         return latencyMax;
 44     }
 45 
 46     public void setLatencyMax(final long[] latencyMax) {
 47         this.latencyMax = latencyMax;
 48     }
 49 
 50     public boolean isSendLatencyFaultEnable() {
 51         return sendLatencyFaultEnable;
 52     }
 53 
 54     public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
 55         this.sendLatencyFaultEnable = sendLatencyFaultEnable;
 56     }
 57 
 58     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
 59         if (this.sendLatencyFaultEnable) {
 60             try {
 61                 /* 第1步,获取一个在延迟上可接受,并且和上次发送相同的Broker。
 62                 首先获取一个自增序号index,通过取模获取Queue的位置下标Pos。
 63                 如果Pos对应的Broker的延迟时间是可以接受的,并且是第一次发送,或者和上次发送的Broker相同,则将Queue返回。
 64                 */
 65                 int index = tpInfo.getSendWhichQueue().getAndIncrement();
 66                 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
 67                     int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
 68                     if (pos < 0)
 69                         pos = 0;
 70                     MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
 71                     if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
 72                         return mq;
 73                 }
 74                 // 第2步,如果第1步没有选择一个Broker,则选择一个延迟较低的Broker
 75                 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
 76                 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
 77                 if (writeQueueNums > 0) {
 78                     final MessageQueue mq = tpInfo.selectOneMessageQueue();
 79                     if (notBestBroker != null) {
 80                         mq.setBrokerName(notBestBroker);
 81                         mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
 82                     }
 83                     return mq;
 84                 } else {
 85                     latencyFaultTolerance.remove(notBestBroker);
 86                 }
 87             } catch (Exception e) {
 88                 log.error("Error occurred when selecting message queue", e);
 89             }
 90             // 第3步,如果第1、第2步都没有选中一个Broker,则随机选择一个Broker。
 91             return tpInfo.selectOneMessageQueue();
 92         }
 93 
 94         return tpInfo.selectOneMessageQueue(lastBrokerName);
 95     }
 96 
 97     public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
 98         if (this.sendLatencyFaultEnable) {
 99             long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
100             this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
101         }
102     }
103 
104     private long computeNotAvailableDuration(final long currentLatency) {
105         for (int i = latencyMax.length - 1; i >= 0; i--) {
106             if (currentLatency >= latencyMax[i])
107                 return this.notAvailableDuration[i];
108         }
109 
110         return 0;
111     }
112 }
View Code

 sendLatencyFaultEnable: 发送延迟容错开关,默认为关闭,如果开关打开了,会触发发送延迟容错机制来选择发送Queue。

上面代码里包括一个随机选择方法tpInfo.selectOneMessageQueue(lastBrokerName),该方法的功能就是随机一个Broker,具体实现如下:
D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\producer\TopicPublishInfo.java (rocketmq-all-4.8.0)

 1     public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
 2         // 第1步,如果没有上次使用Broker作为参考,那么随机选择一个Broker
 3         if (lastBrokerName == null) {
 4             return selectOneMessageQueue();
 5         } else {
 6             // 第2步,如果存在上次使用的Broker,就选择上次使用的Broker,目的是均匀分散Broker的压力。
 7             for (int i = 0; i < this.messageQueueList.size(); i++) {
 8                 int index = this.sendWhichQueue.getAndIncrement();
 9                 int pos = Math.abs(index) % this.messageQueueList.size();
10                 if (pos < 0)
11                     pos = 0;
12                 MessageQueue mq = this.messageQueueList.get(pos);
13                 if (!mq.getBrokerName().equals(lastBrokerName)) {
14                     return mq;
15                 }
16             }
17             // 第3步,如果第1、第2步都没有选中一个Broker,则采用兜底方案——随机选择一个Broker
18             return selectOneMessageQueue();
19         }
20     }
View Code

  客户端在发送消息后,会调用 updateFaultItem()方法来更新当前接受消息的Broker的延迟情况,这些主要逻辑都在 MQFaultStrategy类中实现,延迟策略有一个标准接口 LatencyFaultTolerance。

  2、Broker端保证
  Broker 数据同步方式保证:

  1. 同步复制:指消息发送到Master Broker后,同步到Slave Broker才算发送成功;
  2. 异步复制:指消息发送到Master Broker后,即为发送成功。

在生产环境中,建议至少部署2个Master和2个Slave。

部署方式 优点 缺点 备注
单个Master模式 简单
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用
不建议线上环境使用,可用于本地测试
多个Master模式 配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高。 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会收到影响。 当使用多master无slave的集群搭建方式时,master的brokerRole配置必须为ASYNC_MASTER。如果配置为SYNC_MASTER,则producer发送消息时,返回值的SendStatus会一直是SLAVE_NOT_AVAILABLE。
多Master多Slave模式——异步复制 即使磁盘损坏,消息丢失的非常少,但消息实时性不会受影响,因为Master宕机后,消费者仍然可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样。 Master宕机,磁盘损坏情况,会丢失少量信息。  
多Master多Slave模式——同步双写 数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高; 性能比异步复制模式稍低,大约低10%左右,发送单个消息的RT会稍高,目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。  

 

 

推荐阅读