首页 > 技术文章 > kafka的使用经验

geektcp 2018-09-08 15:31 原文

kafka参数说明(参考):
https://www.cnblogs.com/weixiuli/p/6413109.html


kafka时间戳字段原因(过期清理,日志切分,流式处理),0.10版本开始才有时间戳概念
https://www.cnblogs.com/huxi2b/p/6050778.html

kafka消息是存放在磁盘上,发送一次,累积到一定数量或者时间间隔就落盘一次,消费一次就读一次磁盘

topic划分为若干分区,分区对一个目录,分区划分为segment,一个segment对应三个二进制文件(后缀分别是index,log,timeindex),类似mysql存储机制
消息数据存放在log文件里面,对应的位置存放在index里面,时间戳存放在timeindex里面


分区有副本概念,如果一个topic有10个分区,分3个节点,那么可能是3/3/4的存放方式,比如kjLog-1,kjLog-4,kjLog-7这样的存放方式。
但是其他kjLog-*目录也会有,但是目录下的index,log,timeindex文件一定是空的,没有数据。


副本选举机制:
如果有多个副本,会有一个选举机制,假设有1个分区有5个副本,共6份,如果分区1挂了,其他5份有3份及时和分区1同步了,就会进入ISR,这个东西存放在zk里面
当kafka发现有个分区挂了后,就从ISR找到每个可用的副本所在节点ID,下发通知,这些副本里面3份及时的分区对应的节点就会同时向zk注册,zk机制是只有一个节点能注册成功,这样先注册的就选举成功,成为新的分区,其他分区都要跟它保持同步。


===================
kafka配置:
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
tar -zxvf kafka_2.11-0.10.1.1.tgz -C /home/dig/service/
cd /home/dig/service/
ln -s kafka_2.11-0.10.1.0 kafka

vim /home/dig/service/kafka/config/consumer.properties
zookeeper.connect=bdp-001:2181,bdp-002:2181,bdp-003:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group


vim /home/dig/service/kafka/config/producer.properties
metadata.broker.list=bdp-001:9092,bdp-002:9092,bdp-003:9092
compression.codec=none

producer.type=sync
serializer.class=kafka.serializer.DefaultEncoder
batch.num.messages=200


vim /home/dig/service/kafka/config/server.properties
broker.id=0
host.name=bdp-001
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/dig/service/kafka/data
num.partitions=6
num.recovery.threads.per.data.dir=1

#消息保留时间
log.retention.hours=168

#消息最多保留的字节数
log.segment.bytes=1073741824

#每隔多久检查上面两个参数是否达到阀值
log.retention.check.interval.ms=300000

log.cleaner.enable=false

zookeeper.connect=bdp-001:2181,bdp-002:2181,bdp-003:2181
zookeeper.connection.timeout.ms=6000
auto.create.topics.enable=false
delete.topic.enable=true


另外server.properties中,不要启用自动创建topic:
auto.create.topics.enable=true
否则producer发送消息时,提示分区异常。


拷贝到另外两个节点:
scp -r /home/dig/service/kafka_2.11-0.10.1.0/ bdp-002:/home/dig/service/
scp -r /home/dig/service/kafka_2.11-0.10.1.0/ bdp-003:/home/dig/service/


注意每个节点id不一样:
cat /home/dig/service/kafka/config/server.properties | grep broker.id


临时启动kafka:
rm -rf /tmp/bdp-logs
/home/dig/service/kafka/bin/kafka-server-start.sh /home/dig/service/kafka/config/server.properties


在3台机器上分别执行:
rm -rf /home/dig/service/zookeeper/data/*

ps -ef |grep -v grep | egrep -i 'kafka|Kafka' |awk '{print $2}' |xargs -t -i kill -9 {}
rm -rf /tmp/bdp-logs && ll /tmp/bdp-logs || jps


所有节点关闭完后先启动zk:
/home/dig/service/zookeeper/bin/zkServer.sh start

再启动kafka:
/home/dig/service/kafka/bin/kafka-server-start.sh -daemon /home/dig/service/kafka/config/server.properties
jps
sleep 1
ll /tmp/bdp-logs


查看kafka启动状态:
ls /brokers/ids
输入3个kafka实例的id即集群启动成功:
[1, 2, 3]


apache版本的kafka自带关闭命令无效:
/home/dig/service/kafka/bin/kafka-server-stop.sh /home/dig/service/kafka/config/server.properties

/home/dig/service/kafka/bin/kafka-server-start.sh /home/dig/service/kafka/config/server.properties

有效关闭脚本:
ps -ef |grep -v grep | egrep -i 'kafka|Kafka' |awk '{print $2}' |xargs -t -i kill -9 {}


===========================================
测试kafak是否部署成功:


创建Topic
/home/dig/service/kafka/bin/kafka-topics.sh \
--zookeeper etl1:2181,etl2:2181,etl3:2181 \
--replication-factor 2 \
--partitions 3 \
--create \
--topic kjLog


或者这么删除(只删除zk的元数据,分区文件没有删除):
kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic mytest1


删除话题队列:
kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytest1


查看存在的话题队列:
kafka-topics.sh --zookeeper localhost:2181 --list


向话题队列mytest1 发送消息,运行后直接输入:
kafka-console-producer.sh --broker-list kafka1:9092 --topic mytest1

消费者从话题队列mytest3 中取消息(运行后,直接可以看到输出结果,老版本是连zk,新版本连bootstrap及broker):
kafka-console-consumer.sh \
--bootstrap-server etl3:9092 \
--topic kjLog \
--consumer-property group.id=kjEtlGroup

# 从头开始消费
--from-beginning \


消费组是在消费时自动生成的,默认是console-consumer-31553,后面的数组是随机的,
也可以消费时指定具体消费组名字


查看有哪些消费组:
kafka-consumer-groups.sh --zookeeper localhost:2181 --list

等价于登录zk,查看有哪些消费组:
ls /consumers/


===========================================
查看消息数量offset:
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --time -1 --topic kjLog --partitions 0
输出topic的容量大小
kjLog:2:116060276
kjLog:1:70158992
kjLog:0:15411674


查看指定topic的详细信息:
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:12181 --topic kjLog --group kjEtlGroup
输出结果:
消费者组 话题id 分区id 当前已消费的条数 总条数 未消费的条数 属主
Group Topic Pid Offset logSize Lag Owner
kjEtlGroup kjLog 0 15484439 15484445 6 none
kjEtlGroup kjLog 1 70655159 70655189 30 none
kjEtlGroup kjLog 2 116860888 116860904 16 none


查看分区详细信息:
kafka-topics.sh --zookeeper localhost:2181 --describe
输出结果:
Topic:mytest4 PartitionCount:6 ReplicationFactor:2 Configs:
Topic: mytest4 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: mytest4 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1

表示Topic名称是mytest4,一共6个分区,并且每个分区都有两份完全一样的。
分区0的leader节点是在id为2的kafka实例上,复制节点是2和3两个实例,正在服务的节点也是2和3
分区1的leader节点是在id为3的kafka实例上,复制节点是3和1两个实例,正在服务的节点也是3和1


===========================================

迁移消息:
vim topics-to-move.json
{"topics":
[{"topic": "mytest1"}],
"version":1
}


生成迁移计划:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--broker-list "4" \
--topics-to-move-json-file topics-to-move.json \
--generate


输出如下结果:
Current partition replica assignment

{"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[1]},{"topic":"mytest1","partition":1,"replicas":[3]},{"topic":"mytest1","partition":0,"replicas":[2]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[4]},{"topic":"mytest1","partition":1,"replicas":[4]},{"topic":"mytest1","partition":0,"replicas":[4]}]}


将 Proposed partition 对应json内容写入到文件reassignment-node.json中

 

执行topic迁移:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--reassignment-json-file reassignment-node.json \
--execute


执行迁移后,会在zk上注册一个节点:
登录zk查看: get /admin/reassign_partitions
结果如下:
{"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[4]}]}


查看迁移结果:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--reassignment-json-file reassignment-node.json \
--verify


上面的方式并没有增加新分区,而是对原有分区做了一个副本,增加一个topic的分区方法是:
kafka-topics.sh --zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 --alter --partitions 5 --topic mytest1
这个命令的效果是topic为mytest1的分区数量增加到5个,而不是在原有分区数之上再增加5个分区。

kafka分区只能增加,不能通过自带的命令删除(但是直接删除文件夹可以删除),否则报错:
Error while executing topic command The number of partitions for a topic can only be increased
kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased


增加了分区后,迁移分区的方法就简单了,
编写如下json,其中12,13,14是分区id, 101,102,103,104,105,106是kafka实例id
vim
{
"partitions": [{
"topic": "mytest1",
"partition": 12,
"replicas": [101,102]
},
{
"topic": "mytest1",
"partition": 13,
"replicas": [103,104]
},
{
"topic": "mytest1",
"partition": 14,
"replicas": [105,106]
}],
"version": 1
}


然后执行分区重分布命令:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--reassignment-json-file partitions-extension-push-token-topic.json \
--execute

这样topic为mytest1的分区就改变了,存放到新的物理服务器的节点上了。


=============================================================
官方文档:
http://kafka.apache.org/documentation.html


kafka开发文档:
http://www.aboutyun.com/thread-9906-1-1.html


官方文档翻译:
http://www.cnblogs.com/lzqhss/p/4434901.html


参考翻译:
http://blog.csdn.net/suifeng3051/article/details/48053965


=============================================================
问题1:
http://www.oschina.net/question/196281_248633?sort=time

这是Kafka消息分区造成的,你可以去了解一下Kafka是如何分区的,就知道原因了。

问题原因可能是:你的所有消息的Key都是一样的,使用默认的Partitioner: hash(key)%numPartitions,这样每次的partion num都是一样的,所以数据都落到一个分区了。

而同一consumer grop并行消息,也是按照分区来分配的,因为只有一个分区上有数据,所以有一个consumer始终拿不到消息。

解决办法:1.自定义分区函数。2.消息散列为不同的key

 

问题2:
[2016-08-01 14:13:19,609] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
[2016-08-01 14:13:19,615] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
[2016-08-01 14:13:27,784] ERROR [test-consumer-group_bdp-003-1470075194621-ae0d9636], error during syncedRebalance (kafka.consumer.ZookeeperConsumerConnector)
kafka.common.ConsumerRebalanceFailedException: test-consumer-group_bdp-003-1470075194621-ae0d9636 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)


解决这是一个bug:
There is a bug in kafka.tools.ConsumerOffsetChecker. If the a particular Zookeeper node holding consumed offset information doesn't exit, the tool exits throwing the execption.

For example, suppose you have a consumer group "mygroup" and a topic "topictest". Then the offset for partition 2 is maintained in Znode: /consumers/mygroup/offsets/topictest/2.

If there is no entry for partition 2 of topic topictest in Znode, then consumer offsetchecker tool will exit while checking offset for partition 2. Basically, it will fail while checking the first partition "n" for which the Znode /consumers/mygroup/offsets/topictest/n is missing on Zookeeper.


问题3:
zookeeper启用日志文件,修改conf下面的log4j.properties,但无效。

 

再修改bin下的zkEnv.sh才能生效:
if [ "x${ZOO_LOG_DIR}" = "x" ]
then
ZOO_LOG_DIR="/home/dig/service/zookeeper/log"
fi

if [ "x${ZOO_LOG4J_PROP}" = "x" ]
then
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
fi


问题4:
kafka消息存放多久?
这个取决于kafka消息清理策略。
默认kafka消息的清理策略是删除,但是没有明确写入server.propertiesp文件中:
log.cleanup.policy=delete

默认消息保存一周时间,默认不限制消息总量的长度,检查间隔是1ms。
log.retention.hours=168
log.retention.bytes=-1
log.retention.check.interval.ms=1

 

问题5:
logserver报错:
[ERROR][kafka-producer-network-thread | producer-1][2017-04-05 09:37:21,129][LogServiceImpl.java:onCompletion:220]:Fail to send record to Kafka. Key: appupgrade_alive_user, Value Length: 190
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

原因是,kafka集群宕机或者重启后,topic partition重新选举leader,并且不是原来的leader后,而生产者还是使用原来的leader来生产消息,报错。
解决:重启生产者所在服务器。

 

问题6:
logserver报错:
[WARN][kafka-producer-network-thread | producer-1][2017-04-05 09:37:21,234][Sender.java:completeBatch:298]:Got error produce response with correlation id 23026690 on topic-partition kjLog-0, retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION

原因同上,都是因为leader重新选举导致。

 

问题7:
logserver报错:
536116 [ERROR][kafka-producer-network-thread | producer-1][2017-04-04 09:26:41,952][LogServiceImpl.java:onCompletion:102]:Fail to send record to Kafka. Key: konkasysteminfo_open_p age, Value Length: 462
536117 org.apache.kafka.common.errors.TimeoutException: Batch containing 46 record(s) expired due to timeout while requesting metadata from brokers for kjLog-2

原因不确定,初步怀疑也是分区选举leader变更导致。

 

===========================================
1.kafka为什么要在topic里加入分区的概念?
topic是逻辑的概念,partition是物理的概念,对用户来说是透明的。producer只需要关心消息发往哪个topic,而consumer只关心自己订阅哪个topic,并不关心每条消息存于整个集群的哪个broker。
logsize是写入分区的消息条数,offset是已经消费的条数,这两个只都是从0开始,每次累加(i++),单位是条,不是字节
lag表示没有消费的,已经缓存的条数,lag = logsize - offsize

为了性能考虑,如果topic内的消息只存于一个broker,那这个broker会成为瓶颈,无法做到水平扩展。所以把topic内的数据分布到整个集群就是一个自然而然的设计方式。Partition的引入就是解决水平扩展问题的一个方案。

每个partition可以被认为是一个无限长度的数组,新数据顺序追加进这个数组。物理上,每个partition对应于一个文件夹。一个broker上可以存放多个partition。这样,producer可以将数据发送给多个broker上的多个partition,consumer也可以并行从多个broker上的不同paritition上读数据,实现了水平扩展
里所讲,每个partition可以被认为是一个无限长度的数组,新数据顺序追加进这个数组。物理上,每个partition对应于一个文件夹。一个broker上可以存放多个partition。这样,producer可以将数据发送给多个broker上的多个partition,consumer也可以并行从多个broker上的不同paritition上读数据,实现了水平扩展。


2.如果没有分区,topic中的segment消息写满后,直接给订阅者不是也可以吗?
“segment消息写满后”,consume消费数据并不需要等到segment写满,只要有一条数据被commit,就可以立马被消费

segment对应一个文件(实现上对应2个文件,一个数据文件,一个索引文件),一个partition对应一个文件夹,一个partition里理论上可以包含任意多个segment。所以partition可以认为是在segment上做了一层包装。
这个问题换个角度问可能更好,“为什么有了partition还需要segment”。
如果不引入segment,一个partition直接对应一个文件(应该说两个文件,一个数据文件,一个索引文件),那这个文件会一直增大。同时,在做data purge时,需要把文件的前面部分给删除,不符合kafka对文件的顺序写优化设计方案。引入segment后,每次做data purge,只需要把旧的segment整个文件删除即可,保证了每个segment的顺序写。


3、kafka的消息生产者使用的包是import kafka.javaapi.producer.Producer,不是kafka.producer.Producer
import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;

private final Producer<Integer, String> producer;
props.put("metadata.broker.list", KafkaProperties.kafkaConnect);
producer = new Producer<Integer, String>(new ProducerConfig(props));


4、kafka消息发送字节流,扩展序列化类 参考链接:
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.twill/twill-core/0.1.0-incubating/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java


5、kafka消息部分代码解析:
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

//topicCountMap是设置每个topic开多少线程,每个线程处理执行多个task,
//每个task会从每个topic的一个分区中消费数据,如果有3个分区,每个task会同时从3个topic的分区中获取数据,即每个task会从3个分区中获取数据。
//这里的参数new Integer(1)表示kafka消费服务器开多少个线程,通常最好是线程数量小于等于对应topic的分区数量
topicCountMap.put(topic, new Integer(2));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

//这里的get(0)表示从第一个分区中提取消息
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();

 

推荐阅读