首页 > 技术文章 > kafak入门 第七篇 揭开神秘的“位移主题”面纱

tugeboke 2019-10-29 17:52 原文

今年天学习kafka的中的位移主题,也是kafka 中的内部主题: _consumer_offset
旧版本的位移消息是保存在zookeeper中的,但是在新版本中位移消息是存放在Broker中的位移主题中:_consumer_offset,使用kafka主题保存消息同时也实现了高持久性好搞频的读写性能。这个唯一主题也是一个普通的主题,可以手动的操作,但是不建议手动的管理它让系统自己管理是最好的。
虽说唯一主题是一个普通的主题,但是它的消息格式确实kafka自己定义的,用户不能修改,也就是说你不能随意地向这个主题写入消息,否则一旦写入的消息格式不满足kafka的定义,那么就会导致Broker的奔溃。
主题存的到底是什么格式的消息呢?所谓的消息格式,你可以简单的理解为一个KV对。key和Value分别表示消息的健值和消息体,在kafka中它们就是字节数组而已。
位移主题的key中应该保存3个部分:<Group ID,主题名,分区号>,实际上位置主题消息格式不是只有一种,事实上他有三种格式。另外俩种格式如下:
1. ⽤于保存 Consumer Group 信息的消息。
2. ⽤于删除 Group 过期位移甚⾄是删除 Group 的消息。
第 1 种格式⾮常神秘,以⾄于你⼏乎⽆法在搜索引擎中搜到它的身影。不过,你只需要记住它是 ⽤来注册 Consumer Group 的就可以了。 第 2 种格式相对更加有名⼀些。它有个专属的名字:tombstone 消息,即墓碑消息,也称 delete mark。下次你在 Google 或百度中⻅到这些词,不⽤感到惊讶,它们指的是⼀个东⻄。这 些消息只出现在源码中⽽不暴露给你。它的主要特点是它的消息体是 null,即空消息体。 那么,何时会写⼊这类消息呢?⼀旦某个 Consumer Group 下的所有 Consumer 实例都停⽌ 了,⽽且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写⼊ tombstone 消 息,表明要彻底删除这个 Group 的信息。
 
 
通常来说,当kafka集群中的第一个Consumer程序启动时,kafka会自动创建位移主题。因为位移主题也是普通的主题,所以也有对应的主题分区和主题副本。在Broker端的参数offsets.topic.num.partitions设置分区数,默认是50建议不要修改这个参数,使用参数offsets.topic.replication.factor来设置副本数默认是3。
 
 
那么什么地方会用到位移主题呢?
Consumer提交位移是到位移主题,目前kafka Consumer提交位移的方式有俩种方式: 自动提交和手动提交位移
Consumer端有个参数叫enable.auto.commit,如果值是true,则Consumer在后台默默底为你定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。自动提交省事且能保证消息消费不会丢失,但是失去了灵活性和可控性。
参数enable.auto.commit,如果值是false就是手动提交位移,作为开发的你就该承担起提交位移的责任。同时kafkaConsumer提供了提交位移的方法,如consumer.commitSync,当调用时就会想位移消息中写入消息。
 
如果选择自动提交位移就会有一个问题:只要Consumer一直启动着,它就会无限期的向位移主题中写入消息。
 
我们来举个极端⼀点的例⼦。假设 Consumer 当前消费到了某个主题的最新⼀条消息,位移是 100,之后该主题没有任何新消息产⽣,故 Consumer ⽆消息可消费了,所以位移永远保持在 100。由于是⾃动提交位移,位移主题中会不停地写⼊位移 =100 的消息。显然 Kafka 只需要保 留这类消息中的最新⼀条就可以了,之前的消息都是可以删除的。这就要求 Kafka 必须要有针对 位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。 Kafka 是怎么删除位移主题中的过期消息的呢?答案就是 Compaction。国内很多⽂献都将其翻 译成压缩,我个⼈是有⼀点保留意⻅的。在英语中,压缩的专有术语是 Compression,它的原理 和 Compaction 很不相同,我更倾向于翻译成压实,或⼲脆采⽤ JVM 垃圾回收中的术语:整 理。 不管怎么翻译,Kafka 使⽤Compact 策略来删除位移主题中的过期消息,避免该主题⽆限期膨 胀。那么应该如何定义 Compact 策略中的过期呢?对于同⼀个 Key 的两条消息 M1 和 M2,如 果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描⽇志的所有消 息,剔除那些过期的消息,然后把剩下的消息整理在⼀起。我在这⾥贴⼀张来⾃官⽹的图⽚,来 说明 Compact 过程

 

 

 

图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消 息,因为它是最新发送的。
Kafka 提供了专⻔的后台线程定期地巡检待 Compact 的主题,看看是否存在满⾜条件的可删除 数据。这个后台线程叫 Log Cleaner。很多实际⽣产环境中都出现过位移主题⽆限膨胀占⽤过多 磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查⼀下 Log Cleaner 线程的状 态,通常都是这个线程挂掉了导致的。
 

推荐阅读