it编程 > 编程语言 > Java

Kafka topic消息清理几种方式

40人参与 2024-07-28 Java

kafka清理topic消息

参考链接:https://cloud.tencent.com/developer/article/1590094

快速配置删除法

  1. kafka启动之前,在server.properties配置delete.topic.enable=true

  2. 执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manage集群管理工具删除。如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记为marked for deletion,并且在zookeeper中的/admin/delete_topics下创建对应的子节点,加上配置,重启kafka,之前的topic就真正删除了

  3. 优点由kafka来完成topic的相关删除,只需要修改server.properties配置文件的delete.topic.enable为true就可以了

  4. 缺点:需要重启kafka来完成配置文件的生效

# 默认是false,注意等号前后一定不能有空格,否则配置会不生效
delete.topic.enable=true

# bitnami chart环境变量设置(涉及重启了)
kafka_cfg_delete_topic_enable=true


# 创建新的topic logstash_test(拥有3个副本)
kafka-topics.sh --create --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test --partitions 1 --replication-factor 3


# 查看topic logstash_test的状态,发现leader是1(broker.id=0),有三个备份分别是0,1,2
i have no name!@ape-kafka-0:/$ kafka-topics.sh --describe --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
topic: logstash_test    topicid: 1j9d-wgvtzktpgdtto0yfq partitioncount: 1       replicationfactor: 3    configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        topic: logstash_test    partition: 0    leader: 0       replicas: 0,2,1 isr: 0,2,1
	
# 查看zookeeper上的topic
$ zkcli.sh -server localhost:2181
[zk: localhost:2181(connected) 0] ls /brokers/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]
[zk: localhost:2181(connected) 1] ls /config/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]

# 查看kafka的server.properties配置文件中log.dirs 的目录
i have no name!@ape-kafka-0:/$ ls /bitnami/kafka/data/logstash_test-0/
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint  partition.metadata

# 删除topic logstash_test
i have no name!@ape-kafka-0:/$ kafka-topics.sh --delete --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test

# 再次查看topic logstash_test的状态,说明topic已经被删除了
i have no name!@ape-kafka-0:/$ kafka-topics.sh --describe --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
error while executing topic command : topic 'logstash_test' does not exist as expected
[2024-06-26 03:13:45,323] error java.lang.illegalargumentexception: topic 'logstash_test' does not exist as expected
        at kafka.admin.topiccommand$.kafka$admin$topiccommand$$ensuretopicexists(topiccommand.scala:399)
        at kafka.admin.topiccommand$topicservice.describetopic(topiccommand.scala:311)
        at kafka.admin.topiccommand$.main(topiccommand.scala:62)
        at kafka.admin.topiccommand.main(topiccommand.scala)
 (kafka.admin.topiccommand$)

# 再次查看zookeeper上的topic,logstash_test也已经被删除了
[zk: localhost:2181(connected) 2] ls /brokers/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog]
[zk: localhost:2181(connected) 3] ls /config/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog]

# 再次查看/log.dirs 目录,logstash_test相关日志也被删除了
i have no name!@ape-kafka-0:/$ ls /bitnami/kafka/data/logstash_test*
ls: cannot access '/bitnami/kafka/data/logstash_test*': no such file or directory

手动删除数据

  1. 优点:不需要重启kafka服务,直接删除topic对应的系统日志,然后在zookeeper中删除对应的目录
  2. 缺点:需要人为手动删除,删除之后重新创建同名的topic会有问题(使用方式一不会有此问题)
  3. 不对推荐使用这个方法:简单粗暴,如果这个消息有程序还在消费者,此时kafka就game over
# 创建新的topic logstash_test(拥有3个副本)
i have no name!@ape-kafka-0:/$ kafka-topics.sh --create --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --replication-factor 3 --partitions 1 --topic logstash_test
warning: due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. to avoid issues it is best to use either, but not both.
created topic logstash_test.

# 查看topic logstash_test的状态,发现leader是1(broker.id=1),有三个备份分别是0,1,2
i have no name!@ape-kafka-0:/$ kafka-topics.sh --describe --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
topic: logstash_test    topicid: s7bpyklqrxy6gb8qwq67_a partitioncount: 1       replicationfactor: 3    configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        topic: logstash_test    partition: 0    leader: 1       replicas: 1,0,2 isr: 1,0,2
	
# 查看zookeeper上的topic
[zk: localhost:2181(connected) 0] ls /brokers/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]
[zk: localhost:2181(connected) 1] ls /config/topics
[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]

# 查看kafka的server.properties配置文件中log.dirs的目录
i have no name!@ape-kafka-0:/$ ls /bitnami/kafka/data/logstash_test-0/
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint  partition.metadata

# 删除zookeeper上的topic
[zk: localhost:2181(connected) 5] deleteall /brokers/topics/logstash_test
[zk: localhost:2181(connected) 6] deleteall /config/topics/logstash_test

# 删除topic logstash_test的log文件(这里kafka集群的所有节点都要删除)
rm -rf /bitnami/kafka/data/logstash_test*

# 查询还有哪些topic
i have no name!@ape-kafka-0:/$ kafka-topics.sh --list --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092
__consumer_offsets
frontend_invoke_queue
frontend_invoke_result_log
lake_add_namelist
lake_entrylog
logstash_test

# 再次查看topic logstash_test的状态,可以发现topic还是存在的,这个时候需要手动删除一下topic(数据已清理)
i have no name!@ape-kafka-1:/$ kafka-topics.sh --describe --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
topic: logstash_test    topicid: s7bpyklqrxy6gb8qwq67_a partitioncount: 1       replicationfactor: 3    configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        topic: logstash_test    partition: 0    leader: 1       replicas: 1,0,2 isr: 1,0,2

# 删除之后创建同名的topic会有问题
i have no name!@ape-kafka-1:/$ kafka-topics.sh --create --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --replication-factor 3 --partitions 1 --topic logstash_test
warning: due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. to avoid issues it is best to use either, but not both.
error while executing topic command : topic 'logstash_test' already exists.
[2024-06-26 03:38:34,038] error org.apache.kafka.common.errors.topicexistsexception: topic 'logstash_test' already exists.
 (kafka.admin.topiccommand$)
 
# 删除topic,删除失败(重启kafka后恢复)
i have no name!@ape-kafka-1:/$ kafka-topics.sh --delete --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic logstash_test
error while executing topic command : this server does not host this topic-partition.
[2024-06-26 03:40:30,871] error org.apache.kafka.common.errors.unknowntopicorpartitionexception: this server does not host this topic-partition.
 (kafka.admin.topiccommand$)

设置删除策略

  1. 简单粗暴,如果这个消息有程序还在消费者,此时kafka就game over
  2. 相关参数如下,kafka启动之前,在server.properties配置
#日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete
 
# 注意:下面有两种配置,一种是基于时间的策略,另种是基于日志文件大小的策略,两种策略同是配置的话,只要满足其中种策略,则触发log删除的操作。删除操作总是先删除最旧的日志
# 消息在kafka中保存的时间,168小时之前的1og, 可以被删除掉,根据policy处理数据。
log.retention.hours=4
 
# 当剩余空间低于log.retention.bytes字节,则开始删除1og
log.retention.bytes=37580963840
 
# 每隔300000ms, logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除
log.retention.check.interval.ms=1000

offset删除数据

# 生成数据
# 1. 创建一个新的topic test, 3个分区,1个副本
i have no name!@ape-kafka-0:/$ kafka-topics.sh --create --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic test --partitions 3 --replication-factor 1
created topic test.

# 2. 生成随机消息100条
	kafka-verifiable-producer.sh --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic test --max-messages 100
	
# 3. 查看topic消息有多少
i have no name!@ape-kafka-0:/$ kafka-run-class.sh kafka.tools.getoffsetshell --broker-list ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic test --time -1
test:0:0
test:1:100
test:2:0

# 4. 将配置文件编辑如下,将会将partition 重0删除到49,50并不会删除
cat <<eof> offset.json
{"partitions":[{"topic":"test", "partition":1, "offset": 50}], "version":1}
eof

# 5. 执行删除
	kafka-delete-records.sh --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --offset-json-file offset.json


# 6. 取出消息,看是否符合预期,实际测试0-49被删除了
	kafka-console-consumer.sh --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topic test --from-beginning
(0)

您想发表意见!!点此发布评论

推荐阅读

docker kafka go demo

07-28

BAT面试必考Java面试题100+:Kafka+JVM+数据库+分布式(1)

07-28

Kafka是什么,以及如何使用SpringBoot对接Kafka

07-28

【Kafka】Windows下安装Kafka(图文记录详细步骤)

07-28

深入探索:Zookeeper+消息队列(kafka)集群

07-28

spring boot 使用 Kafka

07-28

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论