Kafka 从 Zookeeper 迁移到 KRaft

摘要

  • 本文介绍 如何将 Kafka 集群从 Zookeeper 模式迁移到 KRaft 模式

  • Kafka官网

  • 本文使用的 Kafka 版本为 3.9.1。Kafka 团队宣布 3.9 会是 最后一个还带有被弃用的 ZooKeeper 模式 的主要版本。以后版本(如 4.0)将完全弃用 ZooKeeper。

  • 官方文档:ZooKeeper到KRaft迁移

从 Zookeeper 模式迁移到 KRaft 模式(平滑迁移)

  • Kafka 官方在 3.4+ 引入了完整的 Zookeeper → KRaft 平滑迁移机制,称为 ZK to KRaft (ZkMigration)

  • 迁移背景与前提

项目 说明
支持版本 Kafka 3.4.0+(建议至少使用 3.6.x ,目前最新版为 3.9.x
迁移目的 摆脱 ZooKeeper,完全切换为 KRaft 自管理模式
迁移模式 在线迁移(无停机或最小停机)
最终目标 Kafka 的控制器与元数据完全由 KRaft 管理,不再依赖 ZooKeeper。
  • 整体迁移流程概览

阶段 控制器类型 Broker 模式 ZooKeeper 角色 KRaft 角色 特征说明
初始阶段 ZooKeeper 控制器 全部为 ZK 模式 管理所有元数据 尚未启用 所有 Broker 都运行在 ZK 模式下,由 ZK 控制器管理集群。
初始元数据加载阶段 KRaft 控制器开始加载 部分(或全部)仍为 ZK 模式 提供元数据源 从 ZK 加载元数据 KRaft 法定节点(controller.quorum.voters)从 ZK 中读取并同步当前集群元数据。
混合阶段 KRaft 控制器 部分 ZK 模式,部分 KRaft 模式 保留只读元数据 管理并更新元数据 KRaft 控制器成为主控,ZK 仍存在但只提供读取,Broker 可处于不同模式(混合状态)。
双写阶段 KRaft 控制器 全部为 KRaft 模式 接收 KRaft 同步写入 管理元数据并写入 ZK 所有 Broker 都运行在 KRaft 模式,控制器将元数据同时写入 ZK 和 KRaft 日志。
迁移完成阶段 KRaft 控制器 全部为 KRaft 模式 不再使用 独立运行 停止向 ZK 写入元数据,ZK 可安全关闭,Kafka 完全运行在无 Zookeeper 的 KRaft 模式下。

开始迁移

启动一个 Controller 节点

  • 在任意一个节点上启动一个 Controller 节点,这里为 worker1

  • 启动前需要先获取当前 Kafka 集群的 Cluster ID

1
2
3
4
5
6
7
$ zookeeper-shell.sh localhost:2181 get /cluster/id
Connecting to localhost:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
{"version":"1","id":"hp_Q0pihQ0ORcIvXlfHobQ"}
  • 准备好 Controller 节点的配置文件,这里可以用 config/kraft/controller.properties 为模板进行修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 配置当前节点的角色,这里只能是controller
process.roles=controller
# 节点ID,不能与现有Broker节点的ID一致
node.id=3000
# 配置集群的投票节点,因为我们当前只启动了一个controller节点,所以只能配置一个投票节点
controller.quorum.bootstrap.servers=worker1:9098
# 配置监听器,注意端口不能重复
listeners=CONTROLLER://:9098
advertised.listeners=CONTROLLER://worker1:9098
controller.listener.names=CONTROLLER
# 日志存放目录,这里存放的是元数据,在格式化时这个目录必须为空目录
log.dirs=/usr/local/kafka/dataDir/kraft-meta

# 控制是否启用 ZooKeeper → KRaft 的元数据迁移过程
zookeeper.metadata.migration.enable=true

# ZooKeeper client 连接
zookeeper.connect=worker1:2181,worker2:2181,worker3:2181

# 指定 Kafka 集群内部(broker 与 broker、KRaft 控制器与 broker)通信所使用的监听器(listener)名称。
# 注意这里要与原先的 server.properties 中配置的监听器名称一致
inter.broker.listener.name=PLAINTEXT

# 其它参数尽量保持与旧集群的配置一致
  • 启动 Controller 节点

千万不要在已有 broker 的数据目录(包含消息数据的 log.dirs)上运行 kafka-storage.sh format ,那会把原有数据结构重置或踩坏。
必须明确:格式化只针对 新 controller 的 metadata 目录(且该目录必须为空)。

1
2
3
4
5
# 格式化元数据目录,log.dirs 参数指定元数据存放目录,首次运行前必须为空目录
# -t 参数指定集群的 Cluster ID,就是前面获取的 Cluster ID
kafka-storage.sh format --standalone -t hp_Q0pihQ0ORcIvXlfHobQ -c /usr/local/kafka/kafka3/config/kraft/controller.properties
# 启动,这里没有后台启动是为了方便观察日志输出
kafka-server-start.sh /usr/local/kafka/kafka3/config/kraft/controller.properties
模式 format 命令 quorum 状态 是否从 ZK 加载
迁移阶段(standalone) --standalone 无(单节点) ✅ 是
正式 KRaft 模式 --standalone ✅ 多节点 ❌ 否(独立运行)

将原先的三个节点作为 Broker 节点重新启动

  • 修改原先的配置文件 server.properties,只需要修改如下内容即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 在最后加入 CONTROLLER:PLAINTEXT
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,CONTROLLER:PLAINTEXT

## 以下是新加入的 配置项
# Set the IBP,当前 kafka 版本是 3.9.1,所以这里设置为 3.9
inter.broker.protocol.version=3.9

# 控制是否启用 ZooKeeper → KRaft 的元数据迁移过程
zookeeper.metadata.migration.enable=true

# KRaft controller quorum configuration,因为目前只启动了一个 controller 节点,所以只能配置一个投票节点
controller.quorum.bootstrap.servers=worker1:9098
# 控制器监听器名称,要与 contreller 节点配置文件 controller.properties 中的配置一致
controller.listener.names=CONTROLLER
  • 分别重新启动三个节点

1
2
3
4
5
6
7
# 关闭 kafka
kafka-server-stop.sh
# 这里要注意 worker1 上不要使用 kafka-server-stop.sh 进行关闭,因为 worker1 上的 controller 节点 也在运行,会有两个 kafka 进程运行,可以用如下命令进行关闭;因为 controller 节点 启动使用的是 controller.properties
ps -ef | grep kafka | grep "server\.properties" | grep -v grep | awk '{print $2}' | xargs kill -9

# 重新启动 kafka
kafka-server-start.sh /usr/local/kafka/kafka3/config/server.properties
  • 当三个节点都以必要的配置重新启动后,迁移将自动开始。迁移完成后,可以在 Controller(worker1)节点 上看到类似如下日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# ✅ 意味:从 ZooKeeper 到 KRaft 的初始元数据迁移已成功,共写入 62 条记录,当前 KRaft metadata offset 为 3179。这是迁移成功的明确证据。
Completed migration of metadata from ZooKeeper to KRaft. 62 records were generated in 300 ms across 1 batches. The average time spent waiting on a batch was 97.00 ms. The record types were {TOPIC_RECORD=3, PARTITION_RECORD=56, CONFIG_RECORD=3}. The current metadata offset is now 3179 with an epoch of 2. Saw 3 brokers in the migrated metadata [1, 2, 3]. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)

# ✅ 意味:控制器已加载并生效新元数据与 feature set(与 offset 3179 对应)。
Loaded new metadata Features(metadataVersion=3.9-IV0, finalizedFeatures={metadata.version=21}, finalizedFeaturesEpoch=3179). (org.apache.kafka.metadata.publisher.FeaturesPublisher)

# ✅ 意味:内部迁移状态已更新,KRaft 上有了写入位置记录。
Finished initial migration of ZK metadata to KRaft in 3486479 ns. Transitioned migration state from ZkMigrationLeadershipState{kraftControllerId=3000, kraftControllerEpoch=2, kraftMetadataOffset=-1, kraftMetadataEpoch=-1, lastUpdatedTimeMs=1760682050169, migrationZkVersion=1, controllerZkEpoch=3, controllerZkVersion=3} to ZkMigrationLeadershipState{kraftControllerId=3000, kraftControllerEpoch=2, kraftMetadataOffset=3179, kraftMetadataEpoch=2, lastUpdatedTimeMs=1760682050169, migrationZkVersion=2, controllerZkEpoch=3, controllerZkVersion=3} (org.apache.kafka.metadata.migration.KRaftMigrationDriver)


# ✅ 意味:迁移流程按预期推进:先把 KRaft 的元数据与 ZK 对齐(sync),然后与 brokers 建立通信,最终进入 DUAL_WRITE(双写)。DUAL_WRITE 阶段表示控制器在写入 KRaft metadata log 的同时,仍然会把必要的写操作也写回 ZooKeeper(双写)——直到迁移完全完成并确认可以停止写 ZK 为止。
3000 transitioning from ZK_MIGRATION to SYNC_KRAFT_TO_ZK state
...
Performing a full metadata sync from KRaft to ZK.
Did not make any ZK writes when reconciling with KRaft state.
3000 transitioning ... to KRAFT_CONTROLLER_TO_BROKER_COMM
...
Sending RPCs to broker before moving to dual-write mode using at offset and epoch OffsetAndEpoch(offset=3179, epoch=2)
...
3000 transitioning ... to DUAL_WRITE state
  • 上面的日志总体上表明,元数据迁移已成功完成并且控制器进入了双写(DUAL_WRITE)阶段。

将三个Broker节点的配置修改为 KRaft 模式的 broker 节点

  • 修改三个节点的配置文件 server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 添加process.roles=broker
process.roles=broker
# 用 node.id 替换 broker.id,注意,node.id 需要与 broker.id 一致
# broker.id=1
node.id=1

# 去掉 zookeeper 相关配置
# Don't set the IBP, KRaft uses "metadata.version" feature flag
# inter.broker.protocol.version=3.9

# Remove the migration enabled flag
# zookeeper.metadata.migration.enable=true

# Remove ZooKeeper client configuration
# zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
  • 分别重新启动三个节点

1
2
3
4
5
6
7
# 关闭 kafka
kafka-server-stop.sh
# 这里要注意 worker1 上不要使用 kafka-server-stop.sh 进行关闭,因为 worker1 上的 controller 节点 也在运行,会有两个 kafka 进程运行,可以用如下命令进行关闭;因为 controller 节点 启动使用的是 controller.properties
ps -ef | grep kafka | grep "server\.properties" | grep -v grep | awk '{print $2}' | xargs kill -9

# 重新启动 kafka
kafka-server-start.sh -daemon /usr/local/kafka/kafka3/config/server.properties

将 Controller 节点的配置修改为 KRaft 模式的 controller 节点

  • 修改 controller 节点的配置文件 controller.properties

1
2
3
4
5
6
## 去掉去下内容
# 控制是否启用 ZooKeeper → KRaft 的元数据迁移过程
# zookeeper.metadata.migration.enable=true

# ZooKeeper client 连接
# zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
  • 重启启动 controller 节点

1
2
# 关闭后重新启动
kafka-server-start.sh -daemon /usr/local/kafka/kafka3/config/kraft/controller.properties
  • 此时你可以关闭 zookeeper 集群了,新的 kafka 集群将不再使用 ZooKeeper,也无法在恢复到 ZooKeeper 模式。

加入新的 Controller 节点

  • Controller 尽量保持 奇数个节点。

  • 之前已经在 worker1 节点上启动了 controller ,现在 worker2worker3 上也来启动 controller 节点,并将它们加入到 kafka 集群中。

  • 在开始配置前,先将上面的 controller 节点 和 三个 broker 节点 的如下配置进行修改,并重启启动。

1
2
3
4
5
6
# 将 controller.quorum.bootstrap.servers 替换为 controller.quorum.voters
controller.quorum.voters=3000@worker1:9098,3001@worker2:9098,3002@worker3:9098
# controller.quorum.bootstrap.servers=worker1:9098

# controller.quorum.voters = 谁是正式投票成员(固定配置)
# controller.quorum.bootstrap.servers = 临时找谁引导连接(迁移或初始化用)
配置项 作用 适用阶段 是否必需 说明
controller.quorum.voters 定义 正式的 KRaft 控制器投票成员列表(voter set) 集群正常运行时 ✅ 是 所有节点必须配置相同的值
controller.quorum.bootstrap.servers 定义 迁移阶段或初始化阶段的控制器连接地址(bootstrap controller endpoint) ZK → KRaft 迁移阶段KRaft 集群初次启动 ⚙️ 可选(仅特定阶段) 用于在 controller quorum 尚未形成时的临时发现
  • 参考 worker1 上的 controller 节点的配置文件 controller.properties,配置 woker2 的 controller 节点配置文件controller.properties ,worker3 也是类似的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 配置当前节点的角色,这里只能是controller
process.roles=controller
# 节点ID,不能与现有Broker节点的ID一致
node.id=3001
# 配置集群的投票节点
controller.quorum.voters=3000@worker1:9098,3001@worker2:9098,3002@worker3:9098
# 配置监听器,注意端口不能重复
listeners=CONTROLLER://:9098
advertised.listeners=CONTROLLER://worker2:9098
controller.listener.names=CONTROLLER
# 日志存放目录,这里存放的是元数据
log.dirs=/usr/local/kafka/dataDir/kraft-meta

# 指定 Kafka 集群内部(broker 与 broker、KRaft 控制器与 broker)通信所使用的监听器(listener)名称。
# 注意这里要与原先的 server.properties 中配置的监听器名称一致
inter.broker.listener.name=PLAINTEXT
  • 初始化日志目录

1
2
3
# 只有 Controller 节点才需要初始化日志目录
# -t 参数指定集群的 Cluster ID,就是前面获取的 Cluster ID
kafka-storage.sh format -t hp_Q0pihQ0ORcIvXlfHobQ -c /usr/local/kafka/kafka3/config/kraft/controller.properties
  • 分别启动 worker2 和 worker3 上的 controller 节点

1
kafka-server-start.sh -daemon /usr/local/kafka/kafka3/config/kraft/controller.properties
  • 此时新的 controller 节点不会立刻加入选举队列,新节点初始状态默认是 observer,需要执行下面的命令将节点加入选举队列

1
2
# 分别在 worker2 和 worker3 上执行
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config /usr/local/kafka/kafka3/config/kraft/controller.properties add-controller
  • 查看集群节点状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ kafka-metadata-quorum.sh --bootstrap-server worker1:9092 describe --replication
NodeId DirectoryId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp Status
3000 RJ4oOPGgTw-KxHFNn4SmiQ 27820 0 1760696136345 1760696136345 Leader
3001 zGnWA7zYmRHG6bcTlFV2qA 27820 0 1760696136259 1760696136259 Follower
3002 gIDkhOQJEHqg-GJBdezU1Q 27820 0 1760696136257 1760696136257 Follower
2 9KeeAYKEQHT92DxqNSwYuA 27820 0 1760696136257 1760696136257 Observer
1 Q8lr8JQ2vrDS35_DrI1MxA 27820 0 1760696136257 1760696136257 Observer
3 rgQR5wd_i5hLgU97dCKIvA 27820 0 1760696136257 1760696136257 Observer

$ kafka-metadata-quorum.sh --bootstrap-server worker1:9092 describe --status
ClusterId: hp_Q0pihQ0ORcIvXlfHobQ
LeaderId: 3000
LeaderEpoch: 5
HighWatermark: 30094
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 3000, "directoryId": "RJ4oOPGgTw-KxHFNn4SmiQ", "endpoints": ["CONTROLLER://worker1:9098"]}, {"id": 3001, "directoryId": "zGnWA7zYmRHG6bcTlFV2qA", "endpoints": ["CONTROLLER://worker2:9098"]}, {"id": 3002, "directoryId": "gIDkhOQJEHqg-GJBdezU1Q", "endpoints": ["CONTROLLER://worker3:9098"]}]
CurrentObservers: [{"id": 2, "directoryId": "9KeeAYKEQHT92DxqNSwYuA"}, {"id": 1, "directoryId": "Q8lr8JQ2vrDS35_DrI1MxA"}, {"id": 3, "directoryId": "rgQR5wd_i5hLgU97dCKIvA"}]

加入 新的 Broker 节点

  • 创建新的 Broker 节点时,参考其它 Broker 节点 配置好配置文件 server.properties,并启动 Broker 节点即可。

  • 无需运行日志目录初始化命令,因为 Broker 节点只存放 消息 数据。

迁移后注意事项

  • 迁移完成后,Kafka 客户端(Producer / Consumer / AdminClient)依然连接的是 Broker 节点,而不是 Controller 节点。

  • Kafka 在 ZooKeeper 模式与 KRaft 模式下的区别主要在于:

    • 控制平面(Control Plane):ZK 模式下由 ZooKeeper + Controller Broker 共同管理;KRaft 模式下由 独立的 Controller 进程或角色 管理(通过 Raft 协议同步元数据)。
    • 数据平面(Data Plane):无论是哪个模式,客户端发送、消费消息仍然是通过 Broker 节点 完成的。
    • 也就是说,Controller 管理集群元数据(主题、分区、副本、Leader 选举等),而 Broker 节点处理实际的消息流。