Kafka 通信协议、SSL加密和身份验证

摘要

  • 本文介绍 Kafka 的 通信协议,以及如何开启外网访问。

  • Kafka官网

  • 本文使用的 Kafka 版本为 3.9.1。Kafka 团队宣布 3.9 会是 最后一个还带有被弃用的 ZooKeeper 模式 的主要版本。以后版本(如 4.0)将完全弃用 ZooKeeper。

Kafka 的 通信协议

  • Kafka 主要支持四种安全协议

协议名称 加密 认证 说明 推荐场景 理由
PLAINTEXT ❌ 否 ❌ 否 无加密、无认证(默认最简单) 开发 / 测试环境、内网集群通信 简单、易调试;网络可信,性能优先
SSL ✅ 是 ✅ 可选 使用 TLS/SSL 加密通信,可配置客户端证书认证 外网客户端访问 支持数据加密,可选认证,保证安全
SASL_PLAINTEXT ❌ 否 ✅ 是 使用 SASL(用户名密码)认证,但不加密数据 需要用户认证但局域网环境 有认证,但不加密,性能开销低
SASL_SSL ✅ 是 ✅ 是 同时支持 SASL 认证和 SSL 加密(最安全) 外网客户端访问 既有认证又加密,安全性最高
  • config/server.properties 文件中 可以看到如下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 套接字服务器监听的地址。
# 如果未配置,则主机名默认等于 `java.net.InetAddress.getCanonicalHostName()` 的返回值,
# 使用监听器名称 `PLAINTEXT`,端口号为 9092。
# 格式:
# listeners = listener_name://host_name:port
# 示例:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Broker 向客户端“通告”的监听器名称、主机名和端口。
# 客户端实际会连接这个地址,而不是直接使用 listeners 的地址。
# 如果未设置,则默认使用 `listeners` 的值。
#advertised.listeners=PLAINTEXT://your.host.name:9092

# 将监听器名称映射到安全协议类型。
# 默认情况下,监听器名称与安全协议同名。
# 例如:PLAINTEXT→PLAINTEXT、SSL→SSL、SASL_PLAINTEXT→SASL_PLAINTEXT、SASL_SSL→SASL_SSL。
# 更多细节可参考 Kafka 官方配置文档。
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
配置项 作用 说明值
listeners Kafka 实际监听的地址(Broker 对外开放的端口) PLAINTEXT://:9092这里 PLAINTEXT 是监听器名称,并不是协议名称,实际上可以配置为任何值,具体协议是通过 listener.security.protocol.map 配置的映射关系来确定。
advertised.listeners Kafka 告诉客户端应该用哪个地址连接(客户端最终连的) 默认使用 listeners 的值
listener.security.protocol.map 映射监听器名称到通信安全协议(如明文、SSL、SASL 等) PLAINTEXT:PLAINTEXT,前面是监听器名称,后面是协议名称

仅需内网访问

1
2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://worker1:9092 # 这里是内网ip

允许外网访问

1
2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://161.189.227.200:9092 # 这里是外网ip

内外网都要访问(推荐双通道方式)

1
2
3
4
5
6
7
8
# 这里 INTERNAL 和 EXTERNAL 分别是自定义的监听器名称,此时内网端口为 9092,外网端口为 9093
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# 告诉客户端应该用哪个地址连接
advertised.listeners=INTERNAL://worker1:9092,EXTERNAL://161.189.227.200:9093
# 映射监听器名称到通信安全协议的映射关系
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
# 集群间通信仍使用内网
inter.broker.listener.name=INTERNAL

开启 SASL_PLAINTEXT

  • 这里设置外网访问时开启 SASL_PLAINTEXT

1
2
3
4
5
6
7
8
9
10
11
12
13
# 监听地址和端口,这里内网和外网分开配置
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9094
# 客户端建立连接后实际返回给客户端的地址
advertised.listeners=INTERNAL://worker1:9092,EXTERNAL://161.189.227.200:9094
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
# 集群间通信 still use INTERNAL
inter.broker.listener.name=INTERNAL

# 认证机制(常见为 PLAIN,也可以是 SCRAM-SHA-256 或者 SCRAM-SHA-512)
# client 连接时
sasl.enabled.mechanisms=PLAIN
# broker 之间连接时,因为 inter.broker.listener.name=INTERNAL,所以 INTERNAL:SASL_PLAINTEXT 才有效
#sasl.mechanism.inter.broker.protocol=PLAIN
  • 创建 kafka_jaas.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################
# Kafka Broker (服务端)
############################
KafkaServer {
# 如果使用 SCRAM-SHA-256 或者 SCRAM-SHA-512 认证,则需要配置 为 org.apache.kafka.common.security.scram.ScramLoginModule required
org.apache.kafka.common.security.plain.PlainLoginModule required
# Broker 自己的身份(用于 broker 之间通信,本示例中没有使用)
username="admin"
password="admin-secret"

# 客户端可用账号,即 user_xxx,这里 xxx 为用户名,= 右边的为密码
user_admin="admin-secret"
user_alice="alice-secret"
user_bob="bob-secret";
};
  • 启动 kafka

1
2
3
# 在启动 Kafka Broker 前,设置环境变量指向 JAAS 文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka3/config/kafka_jaas.conf"
kafka-server-start.sh config/server.properties

客户端访问

  • 创建 client.conf

1
2
3
4
5
security.protocol=SASL_PLAINTEXT
# 认证机制,支持 PLAIN、SCRAM-SHA-256、SCRAM-SHA-512,要与 服务端一致
sasl.mechanism=PLAIN
# 如果使用 SCRAM-SHA-256 或者 SCRAM-SHA-512 认证,则需要配置 为 org.apache.kafka.common.security.scram.ScramLoginModule required
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
  • 命令行访问

1
2
3
4
5
6
7
8
# 创建topic
kafka-topics.sh --create --topic test-topic --bootstrap-server=161.189.227.200:9094 --command-config=client.conf
# 查看topic
kafka-topics.sh --list --bootstrap-server=161.189.227.200:9094 --command-config=client.conf
# 创建消费者,--group 指定消费者组名称
kafka-console-consumer.sh --bootstrap-server=161.189.227.200:9094 --topic test-topic --consumer.config=client.conf --group=test-group
# 创建生产者
kafka-console-producer.sh --bootstrap-server=161.189.227.200:9094 --topic test-topic --producer.config=client.conf

开启 SASL_SSL

  • 这里设置外网访问时开启 SASL_SSL

创建证书

  • 官方文档

  • 生成 Broker keystore,用于 存储 broker 的私钥和证书。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
keytool -keystore kafka.server.keystore.jks \
-alias broker -validity 3650 \
-genkey -keyalg RSA \
-dname "CN=broker, OU=Kafka, O=YourOrg, L=City, ST=State, C=CN" \
-storepass 123456 \
-keypass 123456
## 参数说明:
# -keystore:生成的 keystore 文件路径
# -alias broker:证书别名
# -validity 3650:有效期 3650 天
# -keyalg RSA:密钥算法
# -dname:证书信息
# -storepass:keystore 密码
# -keypass:密钥密码
  • 导出 Broker 证书(用于客户端 truststore),生成 kafka.server.crt,客户端会用它来验证 broker。

1
2
3
4
5
6
7
8
keytool -keystore kafka.server.keystore.jks \
-alias broker -export -file kafka.server.crt \
-storepass 123456
## 参数说明:
# -keystore:keystore 文件路径
# -alias broker:证书别名
# -file kafka.server.crt:导出的证书文件路径
# -storepass:keystore 密码
  • 生成 Broker truststore,truststore 用于 存储信任的证书(这里把自己生成的证书导入进去即可),生成 kafka.truststore.jks

1
2
3
4
5
6
7
8
9
10
# 注意:这里 server 端 和 client 端 可以共用一个 truststore,也可以分别创建
keytool -keystore kafka.truststore.jks \
-alias broker -import -file kafka.server.crt \
-storepass 123456 -noprompt
# 参数说明:
# -keystore:生成的 truststore 文件路径
# -alias broker:证书别名
# -file kafka.server.crt:导入的证书文件路径
# -storepass:truststore 密码
# -noprompt:不提示

server.properties 配置 SASL_SSL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 监听地址和端口,这里内网和外网分开配置
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9095
# 客户端建立连接后实际返回给客户端的地址
advertised.listeners=INTERNAL://worker2:9092,EXTERNAL://161.189.227.200:9095
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SASL_SSL
inter.broker.listener.name=INTERNAL

# SASL
# 认证机制(常见为 PLAIN,也可以是 SCRAM-SHA-256 或者 SCRAM-SHA-512)
# client 连接时
sasl.enabled.mechanisms=PLAIN
# broker 之间连接时,需要 inter.broker.listener.name=SASL_PLAINTEXT 才有效
#sasl.mechanism.inter.broker.protocol=PLAIN


# SSL
ssl.keystore.location=/usr/local/kafka/kafka3/config/ssl/kafka.server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/usr/local/kafka/kafka3/config/ssl/kafka.truststore.jks
ssl.truststore.password=123456
# 如果不要求客户端证书,可以设置 none ,要求则设置为 required
ssl.client.auth=none
  • 启动 kafka 前同样需要先创建好 kafka_jaas.conf,与 SASL_PLAINTEXT 一样。

客户端访问

  • kafka.truststore.jks 拷贝到客户端

  • 与 SASL_PLAINTEXT 一样,创建 client.conf,并添加如下信息

1
2
3
4
5
6
7
8
9
10
11
security.protocol=SASL_SSL
# 认证机制,支持 PLAIN、SCRAM-SHA-256、SCRAM-SHA-512,要与 服务端一致
sasl.mechanism=PLAIN
# 如果使用 SCRAM-SHA-256 或者 SCRAM-SHA-512 认证,则需要配置 为 org.apache.kafka.common.security.scram.ScramLoginModule required
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";

# SSL 配置
ssl.truststore.location=/Users/hanqf/develop_soft/kafka/kafka3/config/ssl/kafka.truststore.jks
ssl.truststore.password=123456
# 禁用主机名验证,否则会校验证书的 SAN,证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空
ssl.endpoint.identification.algorithm=

PEM 证书

  • Kafka 的 证书 默认使用 JKS 格式,但从 2.7.0 开始支持 PEM 格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 监听地址和端口,这里内网和外网分开配置
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9095
# 客户端建立连接后实际返回给客户端的地址
advertised.listeners=INTERNAL://worker2:9092,EXTERNAL://161.189.227.200:9095
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SASL_SSL
inter.broker.listener.name=INTERNAL

# SASL
# 认证机制(常见为 PLAIN,也可以是 CRAM-SHA-256、SCRAM-SHA-512)
# client 连接时
sasl.enabled.mechanisms=PLAIN
# broker 之间连接时,需要 inter.broker.listener.name=SASL_PLAINTEXT 才有效
#sasl.mechanism.inter.broker.protocol=PLAIN


# SSL-PEM
ssl.keystore.type=PEM # 指定证书类型是PEM,支持的类型 PEM、JKS
ssl.keystore.location=/usr/local/kafka/kafka3/config/ssl/fullchain.pem # 包含私钥和公钥
# 指定客户端使用的证书类型是PEM
ssl.truststore.type=PEM
ssl.truststore.location=/usr/local/kafka/kafka3/config/ssl/server.crt # 公钥

# 如果不要求客户端证书,可以设置 none ,要求则设置为 required
ssl.client.auth=none
  • client.conf 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
security.protocol=SASL_SSL
# 认证机制,支持 PLAIN、SCRAM-SHA-256、SCRAM-SHA-512,要与 服务端一致
sasl.mechanism=PLAIN
# 如果使用 SCRAM-SHA-256 或者 SCRAM-SHA-512 认证,则需要配置 为 org.apache.kafka.common.security.scram.ScramLoginModule required
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";

# SSL 配置,将 server 端的 server.crt 拷贝到 client 端
ssl.truststore.location=/Users/hanqf/develop_soft/kafka/kafka3/config/ssl/server.crt
ssl.truststore.type=PEM

# 禁用主机名验证
ssl.endpoint.identification.algorithm=

jks 证书转换为 pem 格式

  • 从 JKS 导出为 PKCS#12 (.p12)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
keytool -importkeystore \
-srckeystore kafka.server.keystore.jks \
-srcstoretype JKS \
-destkeystore kafka.server.p12 \
-deststoretype PKCS12 \
-srcstorepass 123456 \
-deststorepass 123456 \
-J"-Djdk.tls.disabledAlgorithms=" \
-J"-Dkeystore.pkcs12.legacy=false"

## 说明:
# -srcstoretype JKS:原始格式;
# -deststoretype PKCS12:转换为通用格式;
# .p12 是 PEM 的“中间格式”。
  • 导出证书[公钥] (.crt,这里是 PEM 格式)
1
2
3
4
5
6
7
8
openssl pkcs12 -in kafka.server.p12 -clcerts -nokeys -out server.crt -password pass:123456 -provider legacy -provider default
## 说明:
# -clcerts:只导出证书;
# -nokeys:不导出密钥;
# -out server.crt:导出文件名;
# -password:kafka.server.p12的密钥密码,注意密码前面加上 pass:
# -provider legacy:启用旧算法支持模块,在 OpenSSL 3.0(及更高版本)中,引入了一个新机制 —— Provider(算法提供者),默认情况下,OpenSSL 只加载 modern provider(default provider),而许多老旧算法(例如 RC2、MD5、DES、SHA1)被移到了一个单独的 legacy provider 模块中。
# -provider default:同时启用默认 provider,因为有些命令(比如涉及现代加密算法或证书签名)还依赖默认 provider,所以两者一起使用最安全、最兼容
  • 导出私钥 (.key,这里是 PEM 格式)
1
2
3
4
5
6
openssl pkcs12 -in kafka.server.p12 -nocerts -out server.key -nodes -password pass:123456 -provider legacy -provider default
## 说明:
# -nocerts:只导出密钥;
# -out server.key:导出文件名;
# -nodes:不加密导出的密钥文件
# -password:kafka.server.p12的密钥密码,注意密码前面加上 pass:
  • fullchain.pem
1
openssl pkcs12 -in kafka.server.p12 -out fullchain.pem -nodes -password pass:123456 -provider legacy -provider default