Kafka 的 常用命令

摘要

  • 本文介绍 Kafka 的 常用命令

  • Kafka官网

  • 本文使用的 Kafka 版本为 3.9.1。Kafka 团队宣布 3.9 会是 最后一个还带有被弃用的 ZooKeeper 模式 的主要版本。以后版本(如 4.0)将完全弃用 ZooKeeper。

  • Java-Client 代码示例

topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 创建 topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test --partitions 3 --replication-factor 2
# --bootstrap-server 指定 kafka 集群地址
# --topic 创建的 topic 名称
# --partitions 指定分区数,不设置则默认使用 server.properties 中设置的默认值
# --replication-factor 指定副本数,不设置则默认使用 server.properties 中设置的默认值


# 列出 topic
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 topic 详情
kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
## 输出
Topic: test TopicId: Ru0tWQJ4RMWcjjGsKAdWQg PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,2,1 Elr: N/A LastKnownElr: N/A
Topic: test Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 3,2,1 Elr: N/A LastKnownElr: N/A
Topic: test Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 3,2,1 Elr: N/A LastKnownElr: N/A
## 输出说明
# 总体信息(Topic 概览)Topic: test TopicId: Ru0tWQJ4RMWcjjGsKAdWQg PartitionCount: 3 ReplicationFactor: 3 Configs:
| 字段 | 含义 |
| ----------------------------------- | ------------------------------------------------------------- |
| **Topic: disTopic** | Topic 名称,即当前描述的主题。 |
| **TopicId: VUK7Mc9oQdS1mjGG7OhQzQ** | Kafka 内部自动生成的唯一标识符(UUID),Kafka 3.x 之后引入,用于区分同名但不同生命周期的 topic。 |
| **PartitionCount: 3** | 该主题有 3 个分区(partition)。每个分区存储一部分消息。 |
| **ReplicationFactor:** | 副本因子。这里虽然输出中没显示具体值,但可从每行分区配置推断是 **3**(每个分区有 3 个副本)。 |
| **Configs:** | topic 的配置项(例如清理策略、压缩类型等),如果为空,说明使用默认配置。 |

# 分区详情(每个 Partition 一行)

| 字段 | 含义 |
| --------------------------------- | ----------------------------------------------------------- |
| **Partition: 0** | 第 0 号分区。 |
| **Leader: 2** | 该分区当前的 **Leader Broker 是 broker ID = 2**,只有 Leader 才处理读写请求。 |
| **Replicas: 2,3,1** | 该分区的所有副本存放在哪些 Broker 上(即副本分布,AR),分别是 broker 2、3、1。 |
| **Isr (In-Sync Replicas): 2,3,1** | 当前与 Leader 保持同步的副本集合。这里所有副本都在同步中(健康状态 👍)。 |
| **Elr / LastKnownElr** | Kafka 新版本中引入的 "Enhanced Leader Replica" 状态,目前未启用(N/A)。 |


# 删除 topic
kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test

consumer

1
2
3
4
5
6
7
# 创建 consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test
# --topic 指定 topic
# --group 指定 consumer 组

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# --from-beginning 从 topic 的最开始消费

consumer-group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 列出 consumer 组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# --bootstrap-server 集群地址

# 查看 consumer 组详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test
## 输出
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test test 0 2 2 0 console-consumer-2102b86e-895c-4ee3-8304-6df83523d1c1 /10.250.0.7 console-consumer
test test 1 2 2 0 console-consumer-2102b86e-895c-4ee3-8304-6df83523d1c1 /10.250.0.7 console-consumer
test test 2 1 1 0 console-consumer-9ac45b29-d8f3-4649-ab09-7b567aa2ba53 /10.250.0.108 console-consumer

## 输出说明
# GROUP 消费组名称
# TOPIC topic 名称
# PARTITION 分区编号
# CURRENT-OFFSET 当前消费的 offset
# LOG-END-OFFSET topic 中最大的 offset
# LAG 当前消费的 offset 与 topic 中最大的 offset 的差值,即剩余未消费的 消息数量
# CONSUMER-ID 当前消费的 consumer 的 id
# HOST 当前消费的 consumer 的主机名
# CLIENT-ID 当前消费的 consumer 的客户端名称

# 删除 consumer 组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group test

producer

1
2
3
# 创建 producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# --topic 指定 topic

手动触发 Kafka Partitoin 的 Leader 选举(自平衡)

  • kafka的自平衡默认开启,每隔 300秒扫描一次,如果需要平衡的比例高于 10%,则会触发一次

1
2
3
4
5
6
# 开启自动平衡
auto.leader.rebalance.enable=true
# 间隔扫描时间 默认 300 秒
eader.imbalance.check.interval.seconds=300
# 触发比例,即扫描的 broker 上需要平衡的 partition 占当前 broker 全部 partition 的比例,默认 10%
leader.imbalance.per.broker.percentage=10
  • 建议关闭,改为业务低峰时手动触发

1
2
3
4
# 自动平衡
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --topic test --partition 0
# --topic 指定要触发的 topic
# --partition 0 触发 partition 0 的 leader 选举
  • 🧩 参数说明:–election-type

参数值 含义 触发条件 典型使用场景
preferred 首选 Leader 选举(Preferred Leader Election)
Kafka 会尝试将分区的 leader 重新切换为「首选副本」(通常是第一个副本)。
只有当前 leader 不是 首选副本时才执行。 某些副本被自动选举成 leader 后,希望恢复原有「首选 leader」结构,以实现负载均衡。
unclean 非干净 Leader 选举(Unclean Leader Election)
允许从不同步的副本中选举新的 leader。
仅在分区 没有可用 leader 时执行。 在紧急恢复场景下(比如所有 ISR 副本都下线),为了恢复服务可用性,即使会导致数据丢失。
  • Leader Partition⾃动平衡机制

    • Leader Partitoin选举机制能够保证每⼀个Partition同⼀时刻有且仅有⼀个Leader Partition。但是,是不是只要分配好了Leader Partition就够了呢?
    • 在⼀组Partiton中,Leader Partition通常是⽐较繁忙的节点,因为他要负责与客户端的数据交互,以及向Follower同步数据。默认情况下,Kafka会尽量将Leader Partition分配到不同的Broker节点上,⽤以保证整个集群的性能压⼒能够⽐较平均。
    • 但是,经过Leader Partition选举后,这种平衡就有可能会被打破,让Leader Partition过多的集中到同⼀个Broker上。这样,这个Broker的压⼒就会明显⾼于其他Broker,从⽽影响到集群的整体性能。
    • 为此,Kafka设计了Leader Partition⾃动平衡机制,当发现Leader分配不均衡时,⾃动进⾏Leader Partition调整。
    • Kafka在进⾏Leader Partition⾃平衡时的逻辑是这样的:他会认为AR(Replicas副本集)当中的第⼀个节点就应该是Leader节点。这种选举结果成为preferred election 理想选举结果。
    • Controller会定期检测集群的Partition平衡情况,在开始检测时,Controller会依次检查所有的Broker。当发现这个Broker上的不平衡的Partition⽐例⾼于leader.imbalance.per.broker.percentage阈值时,就会触发⼀次Leader Partiton的⾃平衡。