创建 Kafka 主题
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--create --topic my_topic_name \
--partitions 1 --replication-factor 1
用 --bootstrap-server
的原因 :
--zookeeper
会绕过 Kafka 的安全体系。用 --zookeeper
,不受认证体系的约束 , 就能成功创建任意主题--bootstrap-server
与集群进行交互,从 Kafka 2.2 后,社区推荐用 --bootstrap-server
查询所有主题的列表
bin/kafka-topics.sh \
--bootstrap-server broker_host:port --list
查询单个主题的详细数据
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--describe --topic
--zookeeper
: 集群中所有的主题详细数据--bootstrap-server
: 受到安全认证体系的约束,回权限验证,返回能看到的主题增加分区 :
--alter
: 增加某个主题的分区数bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--alter --topic \
--partitions < 新分区数 >
设置 max.message.bytes :
--bootstrap-server
: 设置动态参数--zookeeper
: 设置常规的主题级别参数bin/kafka-configs.sh \
--zookeeper zookeeper_host:port \
--entity-type topics --entity-name \
--alter --add-config max.message.bytes=10485760
设置副本同步机制的带宽 :
bin/kafka-configs.sh \
--zookeeper zookeeper_host:port \
--alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' \
--entity-type brokers --entity-name 0
为所有副本设置限速 :
bin/kafka-configs.sh \
--zookeeper zookeeper_host:port \
--alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' \
--entity-type topics --entity-name test
删除主题 :
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--delete --topic
内部主题
__consumer_offsets
: 消费位移__transaction_state
: 引入事务修改副本数 :
{"version":1, "partitions":[{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},...{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}
bin/kafka-reassign-partitions.sh \
--zookeeper zookeeper_host:port \
--reassignment-json-file reassign.json --execute
查看消费者组提交的位移数据
bin/kafka-console-consumer.sh \
--bootstrap-server kafka_host:port \
--topic __consumer_offsets \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--from-beginning
查看消费者组的状态
bin/kafka-console-consumer.sh
--bootstrap-server kafka_host:port
--topic __consumer_offsets
--formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter"
--from-beginning
__consumer_offsets 占用太多的磁盘,用 jstack
查看 kafka-log-cleaner-thread
的线程状态
主题删除失败原因 :
主题无法删除的万能方法:
/admin/delete_topics
下 , 待删除主题的 znodermr /controller
,触发 Controller 重选举,刷新 Controller 缓存。 (注意 : 分区 Leader 重选举)