首页 > 技术文章 > Kafka生产者

lenoblog 2020-11-23 17:03 原文

一、前言

一个正常的生产逻辑需要具备以下几个步骤:

  • 配置生产者客户端参数及创建相应的生产者实例;
  • 构建待发送的消息;
  • 发送消息;
  • 关闭生产者实例;

二、原理分析

2.1 整体架构

整个生产者客户端由两个线程协调运行,这两个线程分别为主线和Sender线程;

在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累计加器(消息收集器)中;

Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator  

RecordAccumulator  主要用来缓存消息以便Sender线程可以批量发送,可以通过buffer.memory配置,默认值32M;

如果生产者发送消息的速度超过发送到服务器的速度,会导致生产者空间不足,这时KafkaProducer的send()方法调用将会异常,这取决于max.blocks.ms的配置,默认值:60000,即60秒。

ProduceBatch

主线程中发送的消息会被追加到RecordAccumulator  某个双端队列中(Deque)中,在内部为每个分区都维护了一个双端队列,队列中的内容就是ProduceBatch,即Deque<ProduceBatch>。消息写入时,追加到队列的尾部,读取时从双端头部读取。ProduceBatch包含多个ProduceRecord,ProduceRecord即生产者创建的消息。

ProduceRecord拼凑较大ProduceBatch,减少网络吞吐量,如生产端向很多分区发送消息,则修改参数buffer.memory调大整体的吞吐量。

消息在网络上是以字节的形式传输的,在发送前需创建一块内存区域来保存。

RecordAccumulator  内部还有一个BufferPool,它用来实现ByteBuffer的复用,这个大小由batch.size参数来指定,默认值:16KB;

ProduceBatch的大小和batch.size参数有密切关系。

InFlightRequests 

请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<NodeId, Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。

max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求。

2.2 元数据更新

当需要更新元数据时,会先挑选出leastLoaderNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息,这个过程是由Sender线程发起的。

三、重要参数

1.acks

这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。

acks = -1或 acks = all ,生产者在消息发送后,需要等待ISR中所有副本成功写入消息后才能够收到来自服务端的成功响应。

acks = 0 ,生产者发送消息后,不需要等待任务服务端的响应。

2.max.request.size

这个参数用来限制生产者客户端能发送消息的最大值,默认为1MB。

3.retries 和retry.backoff.ms

retries 参数用来配置生产者重试的次数,默认值为0,发生异常不进行任何重试。

retry.backoff.ms设置二次重试之前的时间间隔,默认值为100。

4.compression.type

消息的压缩方式,默认值"non"。

5.connection.max.idle.ms

多久关闭闲置的连接,默认值54000ms,即9分钟;

6.linger.ms

这个参数用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入的时间,默认值为0,即填满或等待时间超过liner.ms值就发送;

7.receive.buffer.bytes

这个参数用来设置Socket接收消息缓冲区的大小,默认32KB;

8.send.buffer.bytes

这个参数用来设置Socket发送缓冲区的大小,默认值128KB;

9.request.timeout.ms

这个参数用来配置Producer等待请求响应的最长时间,默认值30000ms;

四、拦截器

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer. ProducerInterceptor 接口。ProducerInterceptor 接口中包含3个方法:

1 public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
2 public void onAcknowledgement(RecordMetadata metadata, Exception exception);
3 public void close();

KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息。

KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer 的I/O线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close() 方法主要用于在关闭拦截器时执行一些资源的清理工作。

五、序列化器

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给 Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。

生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,比如 StringSerializer,而消费者使用了另一种序列化器,比如 IntegerSerializer,那么是无法解析出想要的数据的。

序列化器都需要实现org.apache.kafka.common.serialization.Serializer 接口,此接口有3个方法:

1 public void configure(Map<String, ?> configs, boolean isKey)
2 public byte[] serialize(String topic, T data)
3 public void close()

configure() 方法用来配置当前类,serialize() 方法用来执行序列化操作。而 close() 方法用来关闭当前的序列化器。

configure() 方法,这个方法是在创建 KafkaProducer 实例的时候调用的,主要用来确定编码类型。

serialize用来编解码,如果 Kafka 客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。

六、分区器

消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。

Kafka 中提供的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了 org.apache.kafka.clients.producer.Partitioner 接口,这个接口中定义了2个方法,具体如下所示。

1 public int partition(String topic, Object key, byte[] keyBytes, 
2                      Object value, byte[] valueBytes, Cluster cluster);
3 public void close();

其中 partition() 方法用来计算分区号,返回值为 int 类型。partition() 方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。close() 方法在关闭分区器的时候用来回收一些资源。

在默认分区器 DefaultPartitioner 的实现中,close() 是空方法,而在 partition() 方法中定义了主要的分区分配逻辑。如果 key 不为 null,那么默认的分区器会对 key 进行哈希,最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区。如果 key 为 null,那么消息将会以轮询的方式发往主题内的各个可用分区。

自定义的分区器,只需同 DefaultPartitioner 一样实现 Partitioner 接口即可。由于每个分区下的消息处理都是有顺序的,我们可以利用自定义分区器实现在某一系列的key都发送到一个分区中,从而实现有序消费。

 

推荐阅读