4.1.1 创建主题
4.1.1.1 自动创建主题以及分区副本
在之前的笔记中提到了创建主题的一个简单示例.kafka提供 kafka-topics.sh
脚本来创建主题.下面这个示例创建了一个 topic-test
的主题,包含4个分区和2个副本.
1 | /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-test --replication-factor 2 --partitions 4 |
分区创建完成后,会在kafka的 log.dirs
或者 log.dir
的目录下创建相应的主题分区.下面是在其中一台Broker节点的信息展示:
1 | [hadoop@bi-dev152 ~]$ ls /opt/logs/kafka/ | grep "topic-test" |
可以看到152节点中创建了2个文件夹 topic-test-0 和 topic-test-2,对应主题 topic-test的2个分区编号为0和2的分区,命名方式可以概括为 <topic>-<partition>
.严谨地说,其实这类文件夹对应的不是分区,分区同主题一样是一个逻辑的概念而没有物理上的存在.并且这里我们也只是看到了2个分区,而我们创建的是4个分区,其余2个分区被分配到了153和154节点中,参考如下:
1 | #153节点 |
三个broker节点一共创建了8个文件夹,这个数字8实质上是分区数4与副本因子2的乘积.每个副本(或者更确切地说应该是日志,副本与日志一一对应)才真正对应 了一个命名形式.
主题,分区,副本和日志的关系如下图所示.主题和分区是提供给上层用户的抽象,而在副本层面(或者更确切的说是Log日志层面)才会实际物理存在.
同一个分区中的多个副本必须分布在不同broker中,并且一个分区副本同时存在多个broker中,这样才能提供有效的数据冗余.上面的示例中,每个副本都分布在至少2台不同的broker中.
4.1.1.2 手动创建主题以及分区副本
通过 kafka-topics.sh
脚本创建的主题会按照内部既定逻辑来分配分区和副本到Broker节点上.其实该脚本还提供一个 replica-assignment
参数来手动指定分区副本的分配方案.用法如下:
1 | 格式为: 分区1broker节点1:分区1broker节点2,分区2broker节点1:分区2broker节点2.副本集合用冒号隔开,分区之间用逗号隔开 |
例如下面这个实例通过手动方式创建了一个和 topic-test
一样分区副本分配的 topic-test-same
主题.
下面是刚创建的自动分配的topic-test主题
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic "topic-test" |
通过 --replica-assignment
手动指定分区副本分配情况
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-test-same --replica-assignment 153:152,154:153,152:154,153:154 |
–replica-assignment参数其实就是逗号隔开的所有分区的Replicas副本集合.副本集合内部用:冒号隔开
查看 topic-test-same
分区信息.和 topic-test
主题分区副本分配一致
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic "topic-test-same" |
手动分配分区副本需要遵循以下原则,否则会报错:
- 同一个分区内的副本不能有重复.比如153:153
- 分区之间所指定的副本数量要相同.比如153:154,152,154:152
- 不能跳过一个分区.比如153:154,,154:152
4.1.1.3 自定义相关参数
在创建主题时,还可以通过 config
参数设置要创建主题的相关参数.可以覆盖原本的默认配置参数. config
可以指定多个参数.用法如下:
1 | --config 参数名=值 --config 参数名=值 ...... |
下面示例使用 config
参数创建主题 topic-config
.并且携带2个参数 :
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-config --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config max.message.bytes=10000 |
查看主题信息:
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-config |
通过zk也能查看到config信息,config信息保存在 /config/topics/TOPIC_NAME
目录下:
1 | [zk: localhost:2181(CONNECTED) 0] get /config/topics/topic-config |
4.1.1.4 总结
创建主题时需要遵循几个原则
- 主题名不能重复,否则会报错.(使用
if-not-exists
参数可以避免出现报错信息,但是不会成功创建一个同名主题) - 主题名不推荐使用__双下划线开头的命名,双下划线开头主题一般看做Kafka的内部主题
- 主题名由大小写祖母,数字,点号,下划线,连接线等组成.不能只有特殊符号
kafka-topics.sh
创建主题信息支持以下参数:
--create
创建主题--replica-assignment
手动创建主题的分区副本分配--config
手动指定参数
4.1.2 查看主题的分区和副本信息
4.1.2.1.查看具体某个topic的信息
kafka-topics.sh
脚本提供了 --describe
参数来查看一个topic的信息:
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic "topic-test" |
在上面的示例中,命令行提供了以下几个信息:
一共有3个broker节点:152,153,154
PartitionCount
表示一共有3个分区
ReplicationFactor
副本因子为2
Leader
表示某个分区对应的leader副本在具体的Broker节点
Replicas
表示分区内所有AR副本的集合
Isr
表示ISR副本集合
4.1.2.2 查看所有topic的信息
如果 kafka-topics.sh
脚本没有指定具体的 --topic
字段.则会展示所有的topic主题:
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe | head |
4.1.2.3 zookeeper查看topic信息
zookeeper提供了 zkCli.sh
客户端.使用客户端连接zookeeper:
1 | [hadoop@bi-dev152 ~]$ /opt/zookeeper-3.4.10/bin/zkCli.sh -server localhost:2181 |
zookeeer的 /brokers/topics
目录下保存了主题的分区副本分片方案.通过查看这个目录即可查看主题的分区和副本信息:
1 | [zk: localhost:2181(CONNECTED) 2] get /brokers/topics/topic-test |
如上示例所示, "2":[152,154]
表示分区2分配了2个副本,分别在152和153这2个broker节点上.
4.1.2.4 查看kafka集群当前所有主题
--list
参数可以列出当前的所有topic
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list |
4.1.2.5 查看主题其他详细信息
kafka-topics.sh
脚本的 describe
参数还支持很多额外的指令,用于查看更详细的信息.
1.--topics-with-overrides
参数表示查看覆盖配置的主题,列出包含了与集群不一样配置的主题.下面列出了 topic-config
这个主题,这个主题使用了 --config
参数创建
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topics-with-overrides |
2.--under-replicated-paritions
参数列出包含失效副本的分区.失效副本的分区可能正在进行同步操作,也有可能同步发生异常.此时分区的ISR集合小于AR集合.失效副本的分区是重点监控对象,因为这可能意味着集群中的某个broker已经失效或者同步效率降低等.
正常情况下此命令不会出现任何信息.例如查看主题 topic-demo
的失效副本信息,但是没有任何输出信息
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-demo --under-replicated-partitions |
此时将153这个节点下线.再次查看:
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-demo --under-replicated-partitions |
可以看到Leader和ISR集合中都没有了153这个节点.将153节点上线.此时再次查询,恢复正常.
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-demo --under-replicated-partitions |
\3. unavailable-partitions
参数可以查看主题中没有leader副本的分区.这些分区已经处于离线状态,对于生产者或者消费者来说不可用.
同样正常情况下,该命令没有展示任何信息.
例如,下面的 topic-test
主题有4个分区,每个分区有2个副本.其中分区1和分区3的副本ISR是153和154这2个节点
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-test |
现在停掉153和154这2个节点的kafka进程.使用 unavailable-partitions
参数查看分区信息
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-test --unavailable-partitions |
leader显示为-1,表示没有可用leader
节点恢复后,再次执行该命令,没有任何显示
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-test --unavailable-partitions |
4.1.2.6 总结
kafka-topics.sh
查看主题信息支持以下参数:
--describe
- 默认展示所有topic的分区副本信息
--topic TOPIC_NAME
展示具体某个topic主题的分区副本信息--topics-with-overrides
列出覆盖配置参数的主题--under-replicated-partitions
列出失效副本的主题分区信息--unavailable-partitions
列出没有副本的主题分区
--list
列出kafka集群下的所有topic主题名称
4.1.3 修改主题
4.1.3.1 修改主题分区数量
当一个主题被修改后,依然允许我们对其做一定的修改,比如修改分区个数,修改配置等.这个功能就是 kafka-topic.sh
脚本中的 alter
指令提供的.
以 topic-config
主题为例,该主题下只有一个分区.将分区修改为3:
1 | [hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-config --partitions 3 |
--partition
参数表示扩展后的分区个数.
注意告警信息.如果主题中的消息包含key(key不为Null)时,根据key计算分区的行为就会受到影响.当分区数为1时,所以key的消息都会发送到这个分区.当分区扩展到3,会根据消息的key来计算区号.原本发往分区0的消息可能会发送到分区1或者2.此外,还会影响既定消息的顺序.
对于基于key计算的主题,不建议修改分区数量.在一开始就设置好分区数量.另外需要注意的是,Kafka不支持减少分区.只能增加不能减少.
1 | hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-config --partitions 1 |
不支持减少分区主要是考虑到保障kafka的消息可靠性和顺序性,事务性问题.
如果修改一个不存在的主题分区,则会报错.添加 --if-exists
参数会忽略一些异常
1 | hadoop@bi-dev152 ~]$ /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-none-exist --partitions 3 |
4.1.3.2 修改主题配置
还可以使用 kafka-topics.sh
脚本的 alter
指令修改主题的配置.在创建主题的时候通过 config
参数来设置要创建的主题相关参数.在创建完主题之后,还可以通过 alter
和 config
配合增加或者修改一些配置文件覆盖原有的值
下面例子演示修改主题 topic-config
的 max.message.bytes
配置.从10000修改到20000
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-config --config max.message.bytes=20000 |
通过 alter
也可以删除创建主题时候的自定义配置.使用 --delete-config
参数.下面这个例子中删除了 max.message.bytes
配置.
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-config --delete-config max.message.bytes |
注意.在对config配置进行增删改查时候,都会提示建议使用kafka-configs.sh这个脚本来实现.该脚本的使用方式下面马上讲到
4.1.4 配置管理
kafka-configs.sh
脚本专门用来对配置进行操作.可以在运行状态下动态更改配置.也可以查询主题的相关配置.而且该脚本不仅可以支持主题相关配置修改,还可以修改broker,用户和客户端这3个类型的配置
kafka-configs.sh
脚本使用 entity-type
参数指定操作配置的类型, entity-name
参数指定操作配置的名称.
4.1.4.1 查询配置
下面这个例子查看主题 topic-config
的配置
1 | [hadoop@bi-dev152 ~]$ kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name topic-config |
--entity-type
指定查看的实体类型.支持以下几种类型:
- topics
- clients
- users
- brokers
--entity-name
配置的实体名称:
- topic name (主题名称)
- client id (客户端ID)
- user principal name (用户名)
- broker id (kafka节点ID)
如果不指定 --entity-name
参数则会查询所有的 entity-type
对应的所有配置信息
1 | [hadoop@bi-dev152 ~]$ kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics |
通过zookeeper也可以查询主题的配置信息.路径为 /config/topics/TOPIC_NAME
1 | [zk: localhost:2181(CONNECTED) 3] get /config/topics/topic-config |
4.1.4.2 修改配置
使用 alter
对配置进行变更.需要配合 add-config
或者 delete-config
这2个参数一起使用.
add-config
参数实现配置的增,改
下面的例子中,为主题 topic-config
添加 max.message.bytes
参数配置和 cleanup.policy
参数配置
1 | [hadoop@bi-dev152 ~]$ kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name topic-config --add-config cleanup.policy=compact,max.message.bytes=20000 |
delete-config
参数可以实现配置删除.
下面的例子中,删除上面的2个配置
1 | [hadoop@bi-dev152 ~]$ kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name topic-config --delete-config cleanup.policy,max.message.bytes |
4.1.5 删除主题
如果确定不再使用一个主题,那么最好的方式是将其删除.这样可以释放一些资源,比如磁盘,文件句柄等. kafka-topics.sh
脚本中的 delete
命令可以用来删除主题.比如下面删除主题 topic-demo1
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic-demo1 |
注意.必须将kafka服务器配置文件的delete.topic.enable选项设置为true才能删除.这个参数的默认值是false.删除主题的操作会被忽略.主题并没有被删除
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --list | grep topic-demo1 |
编辑配置文件 /opt/kafka/config/server.properties
修改下面的参数为true
1 | # Switch to enable topic deletion or not, default value is false |
如果删除一个kafka的内部主题,那么会报错
1 | [hadoop@bi-dev152 ~]$ kafka-topics.sh --zookeeper localhost:2181 --delete --topic __consumer_offsets |
删除一个不存在的主题也会报错,此时可以通过 if-exists
参数来忽略异常.
4.1.5 总结
下面这张图是 kafka-topics.sh
脚本的常用参数