第5章 Kafka进阶管理
1.分区和副本机制
2.基准测试
创建topic:
/opt/kafka/bin/kafka-topics.sh --create \
--bootstrap-server 10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 \
--topic test-one \
--partitions 1 \
--replication-factor 1
生产者基准测试
/opt/kafka/bin/kafka-producer-perf-test.sh \
--topic test-one \
--num-records 5000000 \
--throughput -1 \
--record-size 1000 \
--producer-props bootstrap.servers=10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 acks=1
参数解释:
--topic test-one #主题名称
--num-records 5000000 #总共指定产生的数据量
--throughput -1 #指定吞吐量,-1为不指定
--record-size 1000 3数据大小
--producer-props bootstrap.servers=bootstrap.servers=10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 acks=1 #kafka集群地址和ack模式
消费者基准测试
/opt/kafka/bin/kafka-consumer-perf-test.sh \
--broker-list 10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 \
--topic test-one \
--fetch-size 1048576 \
--messages 5000000
参数解释:
--broker-list 10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 #kafka集群地址
--topic test-one #主题地址
--fetch-size 1048576 #每次拉取数据大小
--messages 5000000 #总消费数量
3.数据存储与清理
我们都知道kafka将消息存储到磁盘上来完成数据持久化,但是这里可能有些同学会产生一种无解,就是认为消费者消费完数据之后就要从kafka里删掉,这是不正确的。kafka并不会因为消费者消费后就删除数据,只是更新消费数据的偏移量。
消费者消费数据后的行为:
Kafka 中的数据被消费者消费后,并不会自动从 Kafka 中删除。消费者读取数据后,它只是更新了它的偏移量(offset),这个偏移量指向消费者在日志中已经读取到的最后一条消息的位置。这个偏移量通常存储在 Kafka 的一个特殊主题(__consumer_offsets)中。
数据的存储和日志的保留:
Kafka 的数据存储在日志文件中。这些日志文件不会因为数据被消费而被自动清理或删除。
Kafka 有两个主要的日志保留策略来决定何时清理旧数据:
+ **基于日志时间的保留**(log.retention.hours 或 log.retention.minutes):可以设置日志保留的时间长度,超过这个时间的旧数据会被删除。
+ **基于日志大小的保留**(log.retention.bytes):可以设置日志文件或分区可以占用的最大字节数。如果设置了此项,超出这个限制的旧数据会被删除。
日志文件的满载行为:
+ 当 Kafka 的日志文件达到设定的大小上限时,它不会覆盖旧数据。而是根据配置的清理策略(如上述的时间基或空间基的保留)来删除旧的日志段。
+ 如果启用了日志压缩(log.cleanup.policy 设置为 "compact"),Kafka 会保留每个键的最新消息,而删除旧的、重复的键的消息,这种方式对于保持配置数据或者变更记录很有用。
清理策略:
+ **删除策略**(delete):根据上述的时间或大小限制自动删除旧数据。
+ **压缩策略**(compact):按照消息的key整合,有相同key但是不同value值,只保留最后一个副本
因此,Kafka 提供了灵活的数据保留策略,允许根据需要调整数据保留的时间或大小,以及选择是删除还是压缩旧的日志数据。这些机制确保了 Kafka 可以在保持高效率的同时,管理存储资源并满足不同的数据保留需求。
在kafka的broker或topic配置相关参数:
配置项 | 配置值 | 说明 |
---|---|---|
log.cleaner.enable | true(默认) | 开启自动清理日志功能 |
log.cleanup.policy | delete(默认) | 删除日志 |
log.cleanup.policy | compaction | 压缩日志 |
log.cleanup.policy | delete,compact | 同时支持删除、压缩 |
基于时间的保留策略
基于日志大小的保留策略
4.消息不丢失机制
kafka消息不丢失可以从broker,生产者,消费者这三个层面来分析。
broker数据不丢失
broker可以理解为kafka的节点,broker层面通过以下几种方式来保障数据不丢失:
- 部署多个broker节点保证高可用
- 每个Topic创建多个分区,每个分区创建多个副本
- 每个ISR中所有的follower都会从leader中复制数据
总结:
- 多broker,多分区,多副本
生产者数据不丢失
当生产者连接到分区的leader写入数据时,可以通过ACK机制来确保数据是否判定为成功写入。ACK机制有三种选项:
1)ACK 的值设置为 -1(最可靠)
只有当所有分区的leader和follower都接收到数据时,才算写入成功
2)ACK 的值设置为 1
只要leader收到数据,就算写入成功
3)ACK 的值设置为 0
只要数据发送成功了,就认为成功,到底有没有保存下来不关心,但是性能最好
消费者数据不丢失
消费者并不产生数据,所以消费者主要每次消费完记录好offset偏移量的值就好,如果没读到消息,只需要从最后一次偏移量开始重新读取就行。
5.消息堆积
什么是消息堆积?
kafka本身消费数据的速度是非常快的,但是如果kafka机器本身的IO繁忙,或者网络抖动,或者消费者消费速度过慢,都有可能造成kafka的消息堆积。简单来说,消息堆积就是指数据消费速度缓慢,导致过多的消息都堆在kafka里。
如果查看消息堆积?
模拟压测:
/opt/kafka/bin/kafka-producer-perf-test.sh \
--topic test-one \
--num-records 5000 \
--throughput -1 \
--record-size 1000 \
--producer-props bootstrap.servers=10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 acks=1
使用图形界面查看:
这里的Lag就是未被消费的消息,也就是所谓的消息堆积,我们只需要将他们消费掉即可
消息堆积可能产生的原因
场景1:消费者挂掉
问题原因:
消费者代码BUG或者异常退出,或者资源不足,导致没有继续消费
解决方案:
修复消费者,给够资源,做好监控
场景2:数据库挂掉
问题原因:
正常流程是:消费者消费数据 --> 写入数据库成功 --> 更新偏移量
结果数据库出现异常,流程变成了:消费者消费数据 --> 写入数据库失败 --> 不会更新偏移量
解决方案:
修复数据库,重新发起消费
场景3:消费者消费速度慢
问题原因:
消费者资源不足,或者CPU繁忙,做好资源限制或者Pod反亲和
解决方案:
多创建几个消费者,但是分区数也要扩容相应的数量
场景4:网络延迟或抖动
问题原因:
因为网络拥塞,导致消费者消费超时,正常是50ms,临时将消费者消费超时时间调大
解决方案:
调大消费超时配置时间
场景5:异常处理
如果需要快速恢复,可以让🐕开发单独编写异常处理的程序,例如先把队列里堆积的数据导出来到文件或数据中,先恢复kafka的健康状态,后半夜再让🐕开发手动或自动处理异常的订单。
场景6:生产者生产消息太快了太多了
可以限速
6.配额限速
kafka的性能非常残暴,这是好事也是坏事,好处是可以快速处理海量消息,坏处是如果放开了跑,可能会导致kafka本身的磁盘IO和网络IO被占满,所以可以根据实际事情情况对kafka进行一些限速处理,防止因为数据量激增导致自爆。
限制producer端速率
bin/kafka-configs.sh xxxxx --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default
限制consumer端速率
bin/kafka-configs.sh xxxxx --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default
取消限速设置
./kafka-configs.sh xxxxx --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default
./kafka-configs.sh xxxxx --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default
更新: 2025-04-07 16:54:52