上海简站商贸有限公司,网站建设艾金手指六六12,太原建设网站,什么网站建设策划方案 论文目录 通过修改保存时间来删除消息★ 删除指定主题的消息演示1、修改kafka检查过期消息的时间间隔2、修改主题下消息的过期时间3、查看修改是否生效4、先查看下主题下有没有消息5、添加几条消息看效果6、查看消息是否被删除 ★ 恢复主题的retention.ms配置1、先查看没修改前的te… 目录 通过修改保存时间来删除消息★ 删除指定主题的消息演示1、修改kafka检查过期消息的时间间隔2、修改主题下消息的过期时间3、查看修改是否生效4、先查看下主题下有没有消息5、添加几条消息看效果6、查看消息是否被删除 ★ 恢复主题的retention.ms配置1、先查看没修改前的test2主题的配置信息2、 将test2主题下的消息的保存时间删除。3、再查看修改后的test2主题的配置信息 通过修改保存时间来删除消息
★ 删除指定主题的消息
Kafka并没有提供直接删除特定主题下消息的方法只能是强制让消息过期之后再来删除消息。
因此需要指定如下两个配置 控制将指定主题下消息的保存时间设为一个很短时间 retention.ms为特定主题设置 控制 Kafka 尽快去检查消息是否过期默认是5分钟检查一次。 log.retention.check.interval.ms
▲ 可通过如下两个属性来强制删除指定主题的消息
retention.ms属性将该属性设为一个极短的时间比如1000。
log.retention.check.interval.ms设置Kafka轮询检查的时间间隔比如将该该属性设为300000。
这意味着每5分钟Kafka会对指定主题的消息执行一次检查检查消息是否过期超过retention.ms属性设置的时间值如果过期则删除这些消息。
演示
官网显示Kafka 默认是5分钟检查一次主题下是否有消息过期 1、修改kafka检查过期消息的时间间隔
为了方便演示我这里改成120秒检查一次演示完要记得注释掉这个配置不然这么频繁检查会很卡。
配置修改后要重启下kafka。 2、修改主题下消息的过期时间
然后修改test2主题下的消息的保存时间改成1秒就是消息存在一秒后就过期。
▲ 将test2主题下的消息的保存时间设为1秒 kafka-configs --bootstrap-server localhost:9092 ^--alter ^--entity-name test2 ^--entity-type topics ^--add-config retention.ms10003、查看修改是否生效
修改完消息的过期时间那么就查看下是否生效
命令查看 使用 kafka-configs.bat 命令的 --describe 子命令 ——该命令可查看所有对象的信息 kafka-configs --bootstrap-server localhost:9092 ^--describe ^--entity-name test2 ^--entity-type topics主题下消息的保存过期时间已经修改成1秒了
CMAK界面查看 4、先查看下主题下有没有消息
查看端口为9092的kafka节点主题是test2查看内容是从beginning开始就存在的消息 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--property print.timestamptrue ^--property print.keytrue ^--property print.offsettrue ^--property print.partitiontrue此时这个test2主题下没有任何消息为方便演示先添加几条消息进去
5、添加几条消息看效果
添加带 key 的消息相当于生产者发送消息到指定的主题里面去
kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2 ^
--property parse.keytrue然后马上查看发现消息发送成功test2主题下有这些消息为了方便我命令再拷贝一份 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--property print.timestamptrue ^--property print.keytrue ^--property print.offsettrue ^--property print.partitiontrue再检查一遍消息的存活时间是存活1秒没错 kafka-configs --bootstrap-server localhost:9092 ^--describe ^--entity-name test2 ^--entity-type topics6、查看消息是否被删除
kafka每个120就会去检查是否有过期的消息有的话就把过期的消息删除掉我又把消息的过期时间设置为1秒所以看过期的消息在120后是否会被kafka检查到并删除掉
接下来就等120秒发现过期消息已经被成功删除了
查看 test2 主题下从beginning 一开始到现在的所有消息 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--property print.timestamptrue ^--property print.keytrue ^--property print.offsettrue ^--property print.partitiontrue演示完把这个设置给注释掉恢复成默认的默认是每5分钟检查一次。 ★ 恢复主题的retention.ms配置
若要指定主题的retention.ms配置依然使用默认值则只要使用kafka-configs.bat命令的 --delete-config 选项删除该配置即可。例如如下命令
因为上面同通过 --add-config retention.ms1000 把消息的过期时间设置为1秒这个设置肯定是不合理的所以需要把这个设置给删除掉。
1、先查看没修改前的test2主题的配置信息 kafka-configs --bootstrap-server localhost:9092 ^--describe ^--entity-name test2 ^--entity-type topics2、 将test2主题下的消息的保存时间删除。
–delete-config retention.ms 就是删除 retention.ms 这个配置属性 –alter ^ 表示修改的意思
kafka-configs.bat --alter ^
--bootstrap-server localhost:9092 ^
--entity-type topics ^
--entity-name test2 ^
--delete-config retention.ms3、再查看修改后的test2主题的配置信息
命令行查看 kafka-configs --bootstrap-server localhost:9092 ^--describe ^--entity-name test2 ^--entity-type topics发现 retention.ms 这个配置属性 已经被成功删除了表示消息的过期时间恢复成默认的168小时了。
这个就是默认的消息过期时间 CMAK 查看 retention.ms 这个配置属性恢复成默认的了