大数据之kafka应用
2024启kafka
kafka常见命令
-
指定jmx端口启动
kafka lsof -i :9999
JMX_PORT=9999 /opt/kafka_2.12-3.1.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties
-
新建topic:副本不能大于kafka_server数
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --partitions 3 --replication-factor 1 --create --topic knowScript
-
查看topic的详情
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --describe --topic knowScript
-
罗列所有的topic
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --list
-
给topic拓展分区:只增大不减小
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --alter --topic knowScript --partitions 4
-
查看topic的磁盘大小,单位为Byte
/opt/kafka_2.12-3.1.0/bin/kafka-log-dirs.sh --bootstrap-server only:9092 --topic-list knowScript --describe | grep -oP '(?<=size":)\d+' | awk '{ sum += $1 } END { print sum }'
-
删除topic
/opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --topic knowScript --delete
-
查看消费者组列表
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --list
-
查看特定消费者组的消费情况
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId
-
新建一个消费者组:消费一个topic即可
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group makeGroup
-
删除一个消费者组
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --delete --group makeGroup
-
往特定topic生产message,不指定key
/opt/kafka_2.12-3.1.0/bin/kafka-console-producer.sh --bootstrap-server only:9092 --topic knowScript
-
往特定topic生产message,指定key
/opt/kafka_2.12-3.1.0/bin/kafka-console-producer.sh --bootstrap-server only:9092 --topic knowScript --property parse.key=true --property key.separator=,
-
消费指定topic的message:–from-beginning,默认为最近的消息消费
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --from-beginning
-
消费topic的message,限制条数
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset earliest --partition 0 --max-messages 3
-
消费指定topic的message:指定分区的offset
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset latest --partition 0 /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset earliest --partition 0 /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset 10 --partition 0
-
查看kafka的log文件明文内容
/opt/kafka_2.12-3.1.0/bin/kafka-dump-log.sh --files /opt/kafka_2.12-3.1.0/data/kafka_logs/knowScript-0/00000000000000000000.log -deep-iteration --print-data-log
-
查看topic的message总数
-
每个分区的最小offset
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server only:9092 -topic knowScript --time -2
-
每个分区的最大offset
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server only:9092 -topic knowScript --time -1
-
-
查看指定消费者组的消费情况offset和lag
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId
-
重设指定消费者组消费指定的offset(每个分区)
-
打印结果,没有执行(生产使用这个验证)
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset --dry-run
-
直接执行
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset --execute
-
重设指定消费者组指定分区的指定offset(topic后使用
:$partitionNum
来设定指定的partition)/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 3 --topic consumerTopicByOffset:1 --execute
-
-
指定消费者组从最新的offset进行消费(没有指定group,则全部都消费,指定了会从消费者组最新的offset开始消费)
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group consumeByOffsetId --from-beginning
-
生产message基准测试; num-records生产的message条数,through为-1表示不限制吞吐量,record-size表示每条record的大小为1024K, producer-props后面跟着kv的producer属性配置
/opt/kafka_2.12-3.1.0/bin/kafka-producer-perf-test.sh --topic test_producer_perf --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=only:9092 acks=-1 linger.ms=2000 compression.type=lz4
-
消费message基准测试
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-perf-test.sh --broker-list only:9092 --messages 10000000 --topic test_producer_perf
生产上重放信息
-
查看指定消费者组的消费情况offset
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId
-
重设指定消费者组消费指定的offset(每个分区)
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset --execute
-
打印结果,没有执行
/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset --dry-run
-
other:重设指定消费者组指定分区的指定offset(topic后使用
:partitionNum
来设定指定的partition)/opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 3 --topic consumerTopicByOffset:1 --dry-run
-
-
指定消费者组从最新的offset进行消费(没有指定group,则全部都消费,指定了会从消费者组最新的offset开始消费)
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group consumeByOffsetId --from-beginning
-
指定分区指定offset进行开始消费(查看某消费者组在特定的分区丢失了哪些消息)
/opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --offset 10 --partition 0
jmx
jmx的配置和开启
-
配置jmx
JMX_PORT=9999 /opt/kafka_2.12-3.1.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties
- 配置文件配置更稳定
jmx的使用
jmx例子一
-
参考url的MBEAN
https://docs.confluent.io/platform/current/kafka/monitoring.html#kafka-monitoring-metrics-broker
-
找到category类别下面的MBEAN:
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic={topicName}
-
获取到对应的MBEAN的指标值
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
-
输出列有
"time","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:Count","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:EventType","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:FiveMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:MeanRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:RateUnit"
-
寻找上面的name的值,如 FifteenMinuteRate
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --date-format "YYYY-MM-dd HH:mm:ss" --attributes FifteenMinuteRate --reporting-interval 5000
- –date-format格式化时间 、 --reporting-interval指定更新时间间隔
-
-
jmx例子二
-
找到category类别下面的MBEAN
kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs
-
获取到对应的MBEAN的指标值
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
-
输出列有
"time","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:50thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:75thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:95thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:98thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:999thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:99thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Count","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Max","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Mean","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Min","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:StdDev"
-
寻找上面的name的值,如 75thPercentile、 95thPercentile
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes 75thPercentile /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes 95thPercentile
-
-
jmx例子三
-
找到category类别下面的MBEAN
kafka.network:type=RequestChannel,name=ResponseQueueSize
-
获取到对应的MBEAN的指标值
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.network:type=RequestChannel,name=ResponseQueueSize --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
-
输出列有
-
"time","kafka.network:type=RequestChannel,name=ResponseQueueSize:Value"
-
-
寻找上面的name的值,如 Value
-
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.network:type=RequestChannel,name=ResponseQueueSize --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes Value
-
-
jmx例子四(special)
-
并不是所有的MBEAN都有对应的jmx指标值;例如:kafka.log:type=Log,name=LogEndOffset
-
下面的命令会报错
/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.log:type=Log,name=LogEndOffset --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
-
但是从
jconsole localhost:9999
的弹窗MBEAN可以找到;上面更深层的信息/opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.log:type=Log,name=LogEndOffset,topic=flink2kafka,partition=0 --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
原文地址:https://blog.csdn.net/liguanhaoyonghu/article/details/137618126
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!