首页 > 技术文章 > RocketMQ源码分析 producer启动以及消息发送流程,producer与broker网络交互过程,发送和接收方式总结

zhangyjblogs 2020-12-20 14:13 原文

1.proucer发送消息本质就是把消息通过网络发送给服务器(broker),broker接收到消息存储应答producer成功。

要发送的消息在producer包装为Message,到了broker端变为MessageExtBrokerInner,producer客户端的启动和发送比较简单,贴个大图

20191115020919255

上图就是producer的启动以及消息的发送。

生产中producer发送消息通常采用同步发送,如果消息大则采用异步发送(消息大的情况比较少) 。

本质producer就是个netty client,它的主要功能就是连接上namesvr:9876,获取到broker、topic、queue等信息缓存到producer本地,实际缓存到DefaultMQProducerImpl.topicPublishInfoTable,然后发送消息的时候选择一个broker和消息队列进行发送。

这里主要说下rmq消息发送的设计。

rmq对于client发送消息,都会最终包装为一个远程命令RemotingCommand,这是个命令模式。

RemotingCommand.code存放的是发送的命令,broker收到后知道客户端要做什么

RemotingCommand.body 存放的是真实要发送的消息。

RemotingCommand.customHeader,是自定义的命令头,是CommandCustomHeader

CommandCustomHeader有许多具体子类,比如producer发送消息是SendMessageRequestHeader,消费端拉取消息是PullMessageRequestHeader,CommandCustomHeader存放自定义的消息信息,比如SendMessageRequestHeader存放的是topic、queueid。

code、body、customHeader组成了RemotingCommand是发送数据。在broker收到消息后,解析RemotingCommand,根据命令进入不同的处理器处理,broker响应消息也是放在RemotingCommand返回。

producer发送消息到broker之间的交互:

20191115000923112 (1)

producer启动的大图中已经写了client server端的inboud outbound处理器,它们的执行顺序参考上图,除了对RemotingCommand编解码的NettyEncoder NettyDecoder,重要的就是图中标注绿色的方法。在server端决定由哪个线程池处理processor,在client端决定处理响应结果,对于同步调用唤醒等待发送线程,对于异步调用,执行回调结果。

重点说下org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(ChannelHandlerContext, RemotingCommand)方法,调用堆栈如图,正好分别是客户端和服务端的inbound事件中调用。

20191115001738137

对于服务端,接收到的是请求,执行NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand),根据RemotingCommand.code即请求命令从缓存NettyRemotingAbstract.processorTable获取对应的执行处理器和线程池,那么processorTable是怎么有值的呢?对于服务端是在broker启动过程中添加的命令和处理器、线程池,查看堆栈如下

20191115002213227

对于client端,是在客户端启动时候添加处理器

20191115002631897

服务端处理请求NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand)

把pair.getObject1()即处理器(比如SendMessageProcessor)保存到新键的task中,然后把task提交到pair.getObject2()即线程池中,这样就可以并发处理客户端请求。task被提交线程池后就会执行,继而执行对应的处理器(比如SendMessageProcessor)而且对于所有的处理器都是使用同一个这样模式,这样设计的很巧妙。

客户端处理响应NettyRemotingAbstract.processResponseCommand(ChannelHandlerContext, RemotingCommand)

客户端同步发送

发送入口是org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(Channel, RemotingCommand, long)方法,创建一个ResponseFuture保存到NettyRemotingAbstract.responseTable并发集合,然后发送数据到broker,接着ResponseFuture同步等待broker响应结果。看如下代码

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();//每次请求都生成个唯一键,用于匹配原ResponseFuture
 
    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);//发送前创建ResponseFuture,功能就类似jdk的future
        this.responseTable.put(opaque, responseFuture);//把ResponseFuture保存到responseTable集合
        final SocketAddress addr = channel.remoteAddress();//服务器地址
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {//writeAndFlush就是发送,发送成功后执行ChannelFutureListener监听器,注意发送成功不代表就接收到了响应,只是表示数据已经发送出去了
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
            	//监听器实际是由netty的线程执行,而非当前发送线程,因此发生成功直接return,并不会进入到finally。发送失败也不会进入finally。如果不懂得netty源码,这里代码容易让人混乱的
                if (f.isSuccess()) {//发送成功回调
                    responseFuture.setSendRequestOK(true);//成功则返回,responseTable存在该responseFuture,在线程[ServerHouseKeepingService]执行NettyRemotingAbstract.scanResponseTable()进行处理
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);//发送失败回调
                }
				//发送失败,就不需要处理响应了,因此把ResponseFuture从responseTable集合移除
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
 
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//当前线程等待timeoutMillis时间被唤醒
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());//接收broker响应超时
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());//发送失败
            }
        }
 
        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);//如果超时or发送失败,则把ResponseFuture从responseTable集合移除
    }
}

接收入口就是NettyRemotingAbstract.processResponseCommand(ChannelHandlerContext, RemotingCommand),从responseTable并发集合根据opaque(每次请求都会生成一个唯一值,用来查找匹配原请求)取出发送之前保存的ResponseFuture,把broker响应结果保存到ResponseFuture并唤醒发送线程。

/*
     * 处理响应结果,通过opaque匹配到原ResponseFuture,对于同步则唤醒等待线程,对于异步则执行回调。
     */
    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();//opaque是每次请求都生成一个,该值是递增的,根据该唯一值匹配到原ResponseFuture
        final ResponseFuture responseFuture = responseTable.get(opaque);//发送之前会把ResponseFuture保存到responseTable集合,这样在处理响应的时候就可以获取到
        if (responseFuture != null) {//说明ResponseFuture还没被scanResponseTable()操作从集合中移除
            responseFuture.setResponseCommand(cmd);//把响应结果保存到responseFuture
 
            responseTable.remove(opaque);//移除,如果不移除就内存泄漏了
 
            if (responseFuture.getInvokeCallback() != null) {//异步调用才有回调
                executeInvokeCallback(responseFuture);//异步发送执行这里
            } else {
                responseFuture.putResponse(cmd);//唤醒同步发送的等待线程
                responseFuture.release();//同步发送,无功能,因为ResponseFuture.once==null
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

这里就有个问题,如果客户端发送很快,但是服务端响应很慢或者不响应,同步调用大量超时,这样就导致NettyRemotingAbstract.responseTable集合不断增加(因为每次调用都向该集合添加一个ResponseFuture),最终会导致oom,这里就有可能内存泄漏了,怎么解决呢?在本篇最开始贴的大图中,有个计划线程每1s执行一次NettyRemotingAbstract.scanResponseTable(),在该方法内,会把超时的ResponseFuture从NettyRemotingAbstract.responseTable集合移除,这样就不会导致内存泄漏了。

客户端异步调用

rmq4.4版本是没有异步发送,入口org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(Message, SendCallback, long)见图

2019111501510111

执行进入到org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(Message, CommunicationMode, SendCallback, long)的时候发生模式是异步直接return null。

异步发送暂时无法分析了。不过也能猜到大概,发送后直接返回,服务端什么时候处理完毕了,吧响应返回,这样客户端在接收响应(在netty inbound事件)得到了ResponseFuture,然后调用回调即可。具体异步发送是有什么bug呢才被去除呢?暂时不清楚

客户端oneway调用

oneway是只发不收,因此就没有ResponseFuture。主要用于非业务型请求,比如REGISTER_BROKER、UPDATE_CONSUMER_OFFSET,具体入口是org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeOnewayImpl(Channel, RemotingCommand, long),onway方式虽然只发送,但是服务端照样会把响应给返回,只是客户端不处理响应(查了下REGISTER_BROKER、UPDATE_CONSUMER_OFFSET命令在服务端对onway发送方式的处理确实是会返回数据)。这样是否有个问题?数据会缓存在网络层,这样是否会导致最终tcp接收缓冲区满了?是不是服务端在writeAndFlush响应数据前应该判断下isOnewayRPC()呢?答案是服务端对于oneway请求是不会的返回数据的,服务端处理请求的入口是org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand),该方法执行步骤是先根据命令找到处理器和线程池,吧task提交到线程池处理,task执行的时候先执行处理器,处理器返回处理结果,接着判断是否是oneway,oneway方式不返回数据,从下图可以看出

2019111502013730

总结说明:rokcetmq采用netty通讯,netty是个异步非阻塞,producer作为一个netty client,发送本质就是个异步,但是做成同步情况就是异步转同步的情况,那么必须要采用接收线程(netty io 线程)必须匹配到原请求(producer发送线程),那么通常的匹配方式就是根据唯一key从ConcurrentHashMap集合,可以找到原请求,如果找不到,说明则超时了,已经被扫描线程给移除了。我的工作中同步转异步也是这样做的,不同之处是用的wait和notify等待和唤醒,rmq采用的countdownlatch。

rmq的netty使用是非常好的例子,可以直接参考整合到自己项目。

推荐阅读