4.2.1 优选副本的选举
4.2.1.1 什么是优先副本
分区使用多副本机制来提升可靠性,但是只有leader副本对外提供读写服务.而follower副本只负责在内部进行消息的同步.如果一个分区的leader副本不可用,那么就意味着整个分区变得不可用.此时就需要从剩余的follower副本中挑选一个新的leader副本继续对外提供服务.
broker节点中的Leader副本个数决定了这个节点负载的高低
在创建主题的时候,主题的分区和副本会尽可能的均匀分布在kafka集群的各个broker节点.对应的Leader副本的分配也比较均匀.例如下面的 topic-demo
主题:
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-demo |
可以看到,leader副本均匀分布在所有的broker节点.另外,同一个分区,在同一台broker节点只能存在一个副本.所以leader副本所在的broker节点叫做分区的leader节点.而follower副本所在的broker节点叫做分区的follower节点.
可以想象的是,随着时间的推移,kafka集群中不可避免的出现节点宕机或者崩溃的情况.当分区的Leader节点发生故障时,其中一个follower节点就会成为新的Leader节点.这样导致集群中的节点之间负载不均衡,从而影响kafka整个集群的稳定性和健壮性.
即使原来的Leader节点恢复后,加入到集群时,也只能成为一个新的follower节点,而不会自动”抢班夺权”变成leader.
例如刚才的 topic-demo
分区重启152节点后,leader分布如下:
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-demo |
尽管kafka非常均匀的将leader副本分布在其他另外2个几点.但是此时152节点的负载几乎为零.
为了有效的治理负载失衡的情况,kafka引入了优先副本(preferred replica)的概念.所谓的优先副本就是在AR集合列表中的第一个副本为优先副本,理想情况下优先副本就是该分区的leader副本.所以也可以称之为 preferred leader
.
比如上面的例子中,分区0的AR集合(Replicas)是[152,153,154].那么分区0的优先副本就是152.Kafka会确保所有主题的优先副本均匀分布.这样就保证了所有分区的leader均衡分布.
4.2.1.2 优先副本选举
所谓的优先副本选举是指通过一定的方式促使优先副本选举为Leader副本,促进集群的负载均衡.这一行为也称之为”分区平衡”.
kafka broker端(server.properties配置文件)有个 auto.leader.rebalance.enble
参数.默认为true.也就是分区自动平衡功能.Kafka会启动一个定时任务,轮询所有的broker节点,自动执行优先副本选举动作.
不过在生产环境中建议将该配置设置为 false
.因为kafka自动平衡分区可能在某些关键高分期时刻引起负面性能问题.也有可能引起客户端的阻塞.为了防止出现此类情况,建议针对副本不均衡的问题进行相应监控和告警,然后在合适的时间通过手动来执行分区平衡.
Kafka中的 kafka-preferred-replica-election.sh
脚本提供了对分区leader副本进行重新平衡的功能.优先副本选举过程是一个安全的过程,kafka客户端会自动感知leader副本的变更.
命令用法如下:
1 | bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 |
但是这样一来会对kafka集群的所有主题和分区都执行一遍优先副本的选举操作.如果集群中包含大量的分区,那么可能选举会失败,并且会对性能造成一定的应用.比较建议的是使用 path-to-json-file
参数来小批量的对部分指定的主题分区进行优先副本的选举操作.该参数指定一个JSON文件,这个JSON文件保存需要执行优先副本选举的分区清单.
举个例子,对上面的 topic-demo
分区进行优先副本选举操作.先创建一个JSON文件,文件名可以任意:
1 | { |
将上述内容保存为 election.json
文件.然后执行下列命令:
1 | [hadoop@bi-dev152 ~]$ kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file ~/election.json |
提示优先副本选举成功.下列结果显示leader副本已经均衡分配到所有Broker节点了
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-demo |
在实际生产环境中,建议使用这种方式来分批的执行优先副本选举操作.杜绝直接粗暴的进行所有分区的优先副本选举.另外,这类操作也应该需要避开业务高峰期,以免对性能造成负面影响,或者出现意外故障.
4.2.2 分区重分配
当集群中一个Broker节点宕机,该节点的所有副本都处于丢失状态.kafka并不会自动将这些失效的分区副本自动迁移到集群其他broker节点.另外当集群中新增一台Broker节点时,只有新创建的主题分区才能被分配到这个节点上,而之前的主题分区并不会自动的加入到新节点(因为在创建时,并没有这个节点).这就导致新节点负载和原有节点负载之间严重不均衡.
为了解决这些问题,需要让分区副本再次进行合理的分配.也就是所谓的分区重分配.kafka提供了 kafka-reassign-paritions.sh
脚本执行分区重分配的工作.可以在集群节点失效或者扩容时使用.使用需要3个步骤:
- 创建一个包含主题清单的JSON文件
- 根据主题清单和Broker节点清单生成一份重分配方案
- 执行具体重分配工作
要执行分区重分配,前提是broker节点清单数量要大于或者等于副本因子数量,否则会报错
Partitions reassignment failed due to replication factor: 3 larger than available brokers: 2
下面创建一个4分区,2个副本因子的主题 topic-reassign
举例.假定要将152这个broker节点下线.下线之前需要将该节点上的分区副本迁移出去.
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-reassign --replication-factor 2 --partitions 4 |
第一步,创建一个JSON文件(文件名假定为reassign.json).文件内容是主题清单:
1 | { |
第二步,根据这个JSON文件和指定要分配的broker节点列表生成一份候选重分配方案:
1 | [hadoop@bi-dev152 ~]$ kafka-reassign-partitions.sh --zookeeper localhost:2181 --generate --topics-to-move-json-file ~/reassign.json --broker-list 153,154 |
在上面的例子中有以下几个参数:
--zookeeper
这个参数已经非常熟悉了
--generate
指令类型参数,类似于kafka-topics.sh脚本中的 --create
, --list
. --describe
等
--topics-to-move-json-file
指定主题清单文件路径
--broker-list
指定要分配的broker节点列表
上面的例子中打印了2个JSON格式内容:
Current partition replica assignment
表示目前的分区副本分配情况,在执行分区重分配前最好备份这个内容,以便后续回滚操作
Proposed partition reassignment configuration
表示候选重分配方案.这里只是一个方案,并没有真正执行.
将第二个Json内容格式化输出后,我们发现这个方案正如我们计划的那样,将该主题的所有分区下的AR副本集合分配到153和154节点,所有副本已经从即将要下线的152节点迁移走.
1 | { |
第三步,将 Proposed partition reassignment configuration
JSON文件内容保存在一个文件中(假定为project.json).然后执行具体的重分配的动作,命令如下:
1 | [hadoop@bi-dev152 ~]$ kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file project.json |
这里仍然打印了之前的副本分配方案,并且提示保存到JSON文件,以便回滚
这里使用了2个不同的命令参数:
--execute
指令类型参数,执行重分配动作--reassignment-json-file
指定重分配方案文件路径
再次查看 topic-reassign
主题分区副本分配情况,所有的副本都从152迁移出去,此时该节点可以顺利下线
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-reassign |
当然,我们也可以直接编写第二个JSON文件来自定义重分配方案,这样就不需要执行上面的第一步和第二步操作了.
分区重分配的基本原理是为每个分区添加新副本(增加副本数量),新副本会从leader副本复制所有的数据.复制完成后,控制器将旧副本从副本清单里移除.(恢复成原来的副本数量).
所以,分区重分配需要确保有足够的空间,并且避免在业务高峰期操作
从刚才的主题分区结果可以看到,大部分的分区leader副本都集中在153这个broker节点.这样负载非常不均衡,我们可以继续借助 kafka-preferred-replica-election.sh
脚本执行一次优先副本选举.
1 | [hadoop@bi-dev152 ~]$ kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file election.json |
和优先副本选举一样,分区重分配对集群的性能有很大的影响.需要占用额外的磁盘,网络IO等资源.在生产环境执行操作时应该分批次执行.
4.2.3 复制限流
我们了解分区重分配的本质在于数据复制,先增加新副本,进行数据同步,然后删除旧副本.如果副本数据量太大必然会占用很多额外的资源,从而影响集群整体性能.kafka有限流机制,可以对副本之间的复制流量进行限制.
副本复制限流有2种实现方式:
kafka-config.sh
kafka-reassign-partitions.sh
前者的实现方式有点繁琐,这里介绍后者的使用方式.
kafka-reassign-partitions.sh
的实现方式非常简单,只需要一个throttle参数即可.例如上面的例子中副本都在153和154节点,现在继续使用分区重分配,让副本从153节点迁移到152节点.但是这次使用限流工具
首先,修改 project.json
文件,将153替换成152
1 | sed -i 's/153/152/g' project.json |
然后,执行分区重分配,这里使用 --throttle
参数,指定一个限流速度(单位是B/s)
1 | [hadoop@bi-dev152 ~]$ kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file project.json --throttle 10 |
上面的示例输出中提示以下3点信息:
- 需要周期性的使用
--verify
参数来周期性的查看副本复制进度,直到分区重分配完成,也就是说需要显示的使用这种方式确保分区重分配完成后解除限流的设置 - 限流的速度为10B/s
- 如果想要修改限流速度,重复此条执行命令,修改throttle的值即可
接下来使用 --verify
参数查看复制进度.下面的示例显示复制已经完成,并且限流已被解除
1 | [hadoop@bi-dev152 ~]$ kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file project.json |
此时153的副本已经被移除
1 | [hadoop@bi-dev152 ~]$ kafka-topics --describe --topic topic-reassign |
4.2.4 修改副本因子
上面的例子中分区重分配,将副本从一个broker节点中移除,此时kafka集群的broker节点数量只剩下2个.副本因子也只有2个.这里有个问题,此时153节点重启,或者新增broker节点后,如何将新增的broker节点加入进群,扩展副本数量呢?或者还有一种情况,当创建主题和分区后,想要修改副本因子呢?
kafka-reassign-parition.sh
脚本同样实现了修改副本因子的功能..仔细观察一下分区重分配案例中的 project.json
文件内容:
1 | [hadoop@bi-dev152 ~]$ cat project.json |
json文件中的副本集合(replicas)都是2个副本,我们可以很简单的添加一个副本.比如对于分区0而言,可以将153节点添加进去.(其他分区也是如此)
1 | { |
执行 kafka-reassign-partition.sh
脚本,执行命令的方法和参数和分区重分片几乎一致:
1 | [hadoop@bi-dev152 ~]$ kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file add.json |
查看副本分配情况.副本数量已经增加到了3个
1 | Topic:topic-reassign PartitionCount:4 ReplicationFactor:3 Configs: |
虽然副本因子增加到3个,但是Leader还是没有分配到新的153这个broker节点.此时可以通过优先副本选举重新分配
1 | [hadoop@bi-dev152 ~]$ kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file election.json |
重点: 与修改分区数量不同,副本数还可以减少,修改方法和命令几乎一样,只需要编辑JSON配置文件即可.这里就不再演示
4.2.5 如何选择合适的分区数量
如何选择合适的分区数量是需要经常面对的问题,但是这个问题似乎并没有权威的标准答案,需要根据实际的业务场景,硬件资源,应用软件,负载等情况做具体考量.这一章节主要介绍与本问题相关的一些决策因素,以供参考
4.2.5.1 性能测试工具
在生产环境中设定分区数量需要考虑性能因素.所以性能测试工具必不可少,kafka本身提供了用于生产者性能测试的 kafka-producer-pref-test.sh
脚本和用于消费者性能测试的 kafka-consumer-perf-test.sh
脚本
首先创建一个用于测试的分区为1,副本为1的 topic-1
的主题:
1 | Topic:topic-1 PartitionCount:1 ReplicationFactor:1 Configs: |
其次.我们往这个主题发送100万条消息,并且每条消息大小为1024B,生产者对应的acks参数为1:
1 | [hadoop@bi-dev152 ~]$ kafka-producer-perf-test.sh --topic topic-1 --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1 |
示例中使用了多个参数:
num-records
: 指定发送消息的总条数
record-size
: 设置每条消息的字节数
throughtput
: 限流控制,-1表示不限流,大于0表示限流值
producer-props
: 指定生产者的配置,可以同时指定多组配置
除此之外还有其他参数,比如 print-metrics
在测试完成之后,打印很多指标信息.有兴趣可以执行 --help
查看更多参数信息.
回过头再看看上面示例中的压测结果信息,以第一条和最后一条为例:
1 | 76666 records sent, 15333.2 records/sec (14.97 MB/sec), 1517.5 ms avg latency, 2061.0 max latency. |
records sent: 表示发送的消息综述
records/sec: 吞吐量,表示每秒发送的消息数量
MB/sec: 吞吐量,表示每秒发送的消息大小
avg latency: 表示消息处理的平均耗时
max latency: 表示消息处理的最大耗时
50th,95th,99th,99.th 表示50%,95%,99%,99.9%时消息处理耗时
消费者压测工具的脚本使用也比较简单,下面的简单实例演示了消费主题 topic-1
中的100万条消息.命令使用方法:
1 | [hadoop@bi-dev152 ~]$ kafka-consumer-perf-test.sh --topic topic-1 --messages 1000000 --broker-list localhost:9092 |
data.consumed.in.MB: 消费的消息总量,单位为MB
MB.sec: 按字节大小计算的消费吞吐量(单位:MB/s)
data.consumed.in.nMsg: 消费的消息消息总数
nMsg.sec: 按消息个数计算的吞独量(单位n/s)
可以创建多个分区,比如10,100,200,500等(副本数量都为1)来测试生产和消费的性能表现,
4.2.5.2 分区数量越多不代表吞独量越高
消息中间件的性能一般是指吞吐量(还包括延迟),吞吐量会受到硬件资源,消息大小,消息压缩,消息发送方式(同步,异步),副本因子等参数影响.分区数量越多不一定吞吐量越高,超过一定的临界值后,kafka的吞吐量会不升反降.
4.2.5.3 分区数量的上限
一味的增加分区数量并不能使吞吐量得到提升,并且分区的数量也不能一直增加,如果超过一定的临界值还会引起kafka进程的崩溃.
每次创建一个分区,都会消耗一个Linux系统的文件描述符.
通过kafka的pid编号,可以查看当前kafka进程占用的文件描述符数量:
1 | [hadoop@bi-dev152 ~]$ ls /proc/22858/fd/ | wc -l |
此时创建一个分区数量为400个的topic-demo4的主题.由于分区会平均创建在集群内的3个broker节点,所以需要统计一下152这个本地节点的分区数量.
1 | [hadoop@bi-dev152 ~]$ kafka-topics --describe --topic topic-demo4 | grep -Eo "Leader:\s[0-9]+" | sort | uniq -c |
可以看到152这个节点创建了134个分区.接下来看看系统文件描述符的数量.正好增加了134个文件描述符
1 | [hadoop@bi-dev152 ~]$ ls /proc/22858/fd/ | wc -l |
可以想见的是,一旦分区数量超过了操作系统规定的文件描述符上限,kafka进程就会崩溃
4.2.5.4 分区数量的考量
如何选择合适的分区数量,一个恰当的答案就是视具体情况而定.
从吞吐量方面考虑,增加合适的分区数量可以在一定程度上提升整体吞吐量,但是超过临界值之后吞吐量不升反降.在投入生产环境之前,应该对吞吐量进行相关的测试,以找到合适的分区数量
分区数量太多会影响系统可用性,当broker发生故障时,broker节点上的所有分区的leader副本不可用,此时如果有大量的分区要进行leader角色切换,这个切换的过程会耗费相当的时间,并且这个时间段内分区会变的不可用.并且分区数量太多不仅为增加日志清理的耗时,而且在被删除时也会消费更多时间.
一个好的建议是,创建主题之前对分区数量性能进行充分压测,在创建主题之后,还需要对其进行追踪,监控,调优.如果分区数量较少,还能通过增加分区数量,或者增加broker进行分区重分配等改进.
最后,一个通用的准则是,建议分区数量设定为集群中broker的倍数,例如集群中有3个broker节点,可以设定分区数为3,6,9等.
4.2.6 总结
kafka-topics.sh
查看,创建主题分区,副本
kafka-configs.sh
修改主题配置文件
kafka_perferred-replica-elections.sh
优先副本选举
kafka-reassign-partitions.sh
分区重分配,副本复制限流,修改副本因子数量
kafka-producer-perf-test.sh
生产者分区数和吞吐量性能压测
kafka-consumer-perf-test.sh
消费者性能压测