首页 > 技术文章 > Kafka producer 的活动关系图

benfly 2018-08-06 17:29 原文

在使用kafka producer API时,主要有2个过程:创建producer实例过程,调用producer实例发送数据(同步/异步)过程,其实在创建producer实例的同时,也创建了一个sender线程,sender线程不断轮询更新metadata及从accumulator中读取数据并真正发送到某个broker上面,下面的活动关系图大致描述了producer的API的内部调用过程

 

 

创建producer实例:

1:client读取producer config,sample如下:

{
	security.protocol=SASL_PLAINTEXT,
	bootstrap.servers=server1.com:8881,server2:8882,
	value.serializer=xxx.serialization.avro.AvroWithSchemaSpecificSer,
	key.serializer=org.apache.kafka.common.serialization.LongSerializer,
	client.id=15164@hostname,
	acks=all
}

2:调用以下方法创建producer实例

Producer<K,V> producer = new KafkaProducer<>(props, keySerClass, valueSerClass);

3:Kafka Producer实例是producer的关键入口,封装了后续所有组件的调用,在创建producer实例的过程中,将依次创建以下组件

创建metadata实例,传递参数 refreshBackoffMs(最小过期时间retry.backoff.ms,默认值100毫秒),metadataExpireMs(元数据最大保留时间metadata.max.age.ms,默认值300000毫秒)

metadata保存了topic,partition,borker的相关信息

this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

4:创建累加器RecordAccumulator

累计器是一个保存消息的有边界的内存队列,当客户端发送数据时,数据将append到队列尾部,如果内存耗尽,append调用将被阻塞,在实例内部,batches成员保存将被sender的数据

private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

调用方法

this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), //batch.size,默认值16384(16K),record成批发送的字节数
                    this.totalMemorySize, //buffer.memory,默认值33554432(32M),缓冲区内存字节数
                    this.compressionType, //compression.type,默认值none,表示producer的数据压缩类型,有效值为none,gzip,snappy,lz4
                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs, //retry.backoff.ms,默认值100毫秒,metadata最小过期时间
                    metrics,
                    time);

 5:创建通道构建器实例,ChannelBuilder是一个接口,因安全认证方式不同,分别有具体的实现类SaslChannelBuilder,SslChannelBuilder及PlaintextChannelBuilder,通道是java nio中的实际与IO通信的部分,在kafka中,类KafkaChannel封装了SocketChannel。在创建构建器实例时,将进行登录认证

ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());

6:创建网络客户端实例,NetworkClient封装了底层网络的访问,及metadata数据的更新。

            NetworkClient client = new NetworkClient(
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                    this.metadata,
                    clientId,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                    this.requestTimeoutMs, time);

 

在NetworkClient构造函数中,传进了Selector的实例,这是一个封装了java nio selector的选择器,在Selector构造函数中开启了java nio selector,并且也将前面创建的ChannelBuilder传给NetworkClient内部成员

this.nioSelector = java.nio.channels.Selector.open();

 7:创建线程类sender并启动线程,将前面创建的NetworkClient实例,metadata实例及累加器accumulator全部导入到sender类中,sender线程类是一个关键类,所有的动作都是在这个线程中处理的,当sender线程启动后,不断的轮询进行元数据的更新和消息的发送

            this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

 

发送数据过程:

1:客户端发送数据

客户端构造好ProducerRecord,调用send方法发送消息,send方法是一个异步方法,返回一个future对象,调用完后就立即返回,send方法只是把数据写入到内存队列RecordAccumulator后就返回了,如果想同步发送消息并确认消息是否发送成功,可以再调用get方法,这将阻塞当前发送线程

ProducerRecord<K, V> producerRecord = new ProducerRecord<>(“topic”,data);
producer.send(producerRecord, new ProducerCallBack(requestId));

在调用send方法时,可以传入回调对象,回调函数用于处理send后对ack的处理

2:record 保存到累加器中

在send方法内部,如果有配置拦截器,则先调用拦截器对数据做处理,处理完后的数据,再调用doSend方法,

ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback); //异步发送一条记录到topic

在doSend方法把记录保存到累加器之前,需要做几个事情,首先需要调用waitOnMetadata确认给定topic的并包含partition的metadata是否可用,在waitOnMetadata中如果没有partition,则在循环中请求更新metadata,并唤醒sender线程 (sender.wakeup()),更新metadata,如果超出block时间则timeout异常退出

long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); //KafkaProducer.send()的最长阻塞时间,max.block.ms,默认为60000毫秒
while (metadata.fetch().partitionsForTopic(topic) == null) {
            log.trace("Requesting metadata update for topic {}.", topic);
            int version = metadata.requestUpdate();
            sender.wakeup(); //唤醒sender线程去更新metadata
            metadata.awaitUpdate(version, remainingWaitMs); //等待metadata更新,如果超出max.block.ms时间,则抛出timeout异常
            ......
}

如果在到达 max.block.ms前成功更新metadata,则对record做key/value序列化,添加到累加器中,然后再次唤醒sender线程

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

在这个阶段中,数据只是发送到一个中间的缓冲区中,并没有真正的传到broker上

 

sender线程

1:sender线程启动后,循环执行Sender.run(now)函数,此函数的最主要功能就是从accumulator中drain出数据,转成生产者请求数据List<ClientRequest>

 

2:如果有生产者请求数据列表,则调用NetworkClient.send函数,此函数内部调用了doSend(ClientRequest,long)函数,将请求加入飞行队列中inFlightRequests,

{NetworkClient}    

private void doSend(ClientRequest request, long now) { request.setSendTimeMs(now); this.inFlightRequests.add(request); //将请求加入飞行队列 selector.send(request.request()); //数据被保存到KafkaChannel的内存中 }

 

3:接着调用Selector.send(Send)方法,从Send数据中取到目标KafkaChannel,再放到KafkaChannel通道的待发送内存中,此时数据还没有真正的被传递broker

{Selector} 
public void send(Send send) { KafkaChannel channel = channelOrFail(send.destination()); //取到目标channel try { channel.setSend(send); //把数据保存到目标channel内存中 } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } }

 

4:当所有从accumulator中取出来的数据被放到对应的channel后,调用NetworkClient.poll,这才是对socket实际读写的地方

 

5-8:首先需要对元数据的更新,在maybeUpdate中,如果元数据更新时间(metadataTimeout)已经到0了,说明需要更新元数据,找到最近最少用的节点,如果没建好连接,则先创建socket的连接,到第7步,调用Selector.connect,到第8步调用SocketChannel.open及SocketChannel.connect建立socket连接

 

{NetworkClient}

        public long maybeUpdate(long now) {
            // should we update our metadata?
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
            // if there is no node available to connect, back off refreshing metadata
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                    waitForMetadataFetch);

            if (metadataTimeout == 0) {
                // Beware that the behavior of this method and the computation of timeouts for poll() are
                // highly dependent on the behavior of leastLoadedNode.
                Node node = leastLoadedNode(now);
                maybeUpdate(now, node);
            }

            return metadataTimeout;
        }

private void maybeUpdate(long now, Node node) {
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                // mark the timestamp for no node available to connect
                this.lastNoNodeAvailableMs = now;
                return;
            }
            String nodeConnectionId = node.idString();

            if (canSendRequest(nodeConnectionId)) { //如果连接以及建好,可以发送数据
                this.metadataFetchInProgress = true;
                MetadataRequest metadataRequest;
                if (metadata.needMetadataForAllTopics())
                    metadataRequest = MetadataRequest.allTopics();
                else
                    metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
                ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                doSend(clientRequest, now); //发送数据,将数据保存到channel的内存中
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                // we don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node.id());
                initiateConnect(node, now); //创建socket连接
                // If initiateConnect failed immediately, this node will be put into blackout and we
                // should allow immediately retrying in case there is another candidate node. If it
                // is still connecting, the worst case is that we end up setting a longer timeout
                // on the next round and then wait for the response.
            } else { // connected, but can't send more OR connecting
                // In either case, we just need to wait for a network event to let us know the selected
                // connection might be usable again.
                this.lastNoNodeAvailableMs = now;
            }
        }

 

9:socket连接建立好后,注册到nioSelector中,并使用前面创建的channelBuilder创建KafkaChannel通道,KafkaChannel中创建和封装了传输层TransportLayer,传输层封装了socketChannel,通道保存在Map中,而且KafkaChannel中也创建了authenticator认证器

{Selector}

     SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); //注册到Selector中
     KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); //创建KafkaChannel
     key.attach(channel);
     this.channels.put(id, channel);

 

10:如果到节点的状态是连接的,并且channel状态已经ready(传输层是ready的,authenticator认证器是complete状态),那么在上面metadataUpdate中,就可以发送元数据更新的请求,调用NetworkClient.doSend,数据被放到飞行请求队列(inFlightRequests)及相应的KafkaChannel内存中,如果inFlightRequests中的请求在请求超时后(默认为"request.timeout.ms" -> "30000",30秒),将断开请求所对应的socket连接,并设置重刷metadata

 

11:调用Selector.poll进行数据的IO操作,如果获取到selector key,则在这个可以上取到对应的KafkaChannel,调用channel.write发送数据到broker上

 

推荐阅读