自学内容网 自学内容网

centos kafka单机离线安装&kafka服务化&kafka tool连接kafka

a.版本&环境

linux版本:centos7.6

kafka: kafka_2.12

zookeeper:zookeeper_3.6.3(之前已经安装:linux zookeeper安装并服务化-CSDN博客)

java:1.8(之前已经安装)

windows kafka tool: 2.1

b.kafka单机安装

1.切换目录 cd downloads/,利用rz命令,上传kafka 及kafka eagle压缩包至服务器

2.解压文件:tar -zxvf kafka_2.12-2.6.2.tgz

创建文件夹:mdkir -p /opt/tool/kafka

创建文件夹:mdkir -p /home/kafka/log

剪切文件至指定目录:mv kafka_2.12-2.6.2 /opt/tool/kafka/

切换目录:cd /opt/tool/kafka/kafka_2.12-2.6.2/

3.修改配置并开放9092端口

执行:vim config/server.properties,修改以下配置

#机器在集群中的唯一标识, 和zookeeper的myid性质一样,每台服务器的broker.id都不能相同

broker.id=151

#kafka对外提供服务的端口默认是9092 这个IP不能用0.0.0.0这种


listeners=PLAINTEXT://192.168.42.151:9092 

advertised.listeners=PLAINTEXT://192.168.42.151:9092

消息存放的目录, 这个目录可以配置为", "逗号分割的表达式, 上面的num.io.threads要大于这个

目录的个数, 如果配置多个目录, 新创建的topic将消息持久化的地方是, 当前以逗号分割的目录中,

哪个分区数最少就放那一个

log.dirs=/home/kafka/log

#每个日志文件删除之前保存的时间, 默认168小时(7天)

#发送到Queue中的消息, 在被消费之后, 不会马上被删掉, 而是有后台进程在4天后去删除

log.retention.hours=96

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

zookeeper.connect=192.168.42.151:2181

#Timeout in ms for connecting to zookeeper, 默认6秒, 改成60秒, 因为systemd文件自启动

kafka时候需要等待zookeeper先启动完毕

zookeeper.connection.timeout.ms=60000

c.kafka测试及常用指令
  1. 切换目录:cd/opt/tool/kafka/kafka_2.12-2.6.2/bin

2.创建my-test1,my-test topic

./kafka-topics.sh --bootstrap-server 192.168.42.151:9092 --create --topic my-test topic --replication-factor 1 --partitions 3

./kafka-topics.sh --bootstrap-server 192.168.42.151:9092 --create --topic my-test1 topic --replication-factor 1 --partitions 3

3.修改topic分区数

./kafka-topics.sh --bootstrap-server 192.168.42.151:9092 --alter --topic my-test1 topic --partitions 5

4.查看topic列表

./kafka-topics.sh --list --bootstrap-server 192.168.42.151:9092

5.删除topic

./kafka-topics.sh --bootstrap-server 192.168.42.151:9092 --delete --topic my-test topic

6.发布消息

./kafka-console-producer.sh --bootstrap-server 192.168.42.151:9092 --topic my-test1

7.消费消息

./kafka-console-consumer.sh --bootstrap-server 192.168.42.151:9092 --topic my-test1 --group group-test

8.查看消费组列表

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --list

9.查看消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --describe --group group-test

10.查看消费组members

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --describe --group group-test --members

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --describe --group group-test --state

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --describe --group group-test --members --verbose

11.指定内容查看消息

./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic paymentmgt_download_asyn_deal --from-beginning | grep "1291248658964021250"

d.kafka服务化

切换目录:cd /usr/lib/systemd/system

创建文件:cd /usr/lib/systemd/system

打开文件并新增如下内容:vim kafka.service

[Uint]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service

[Service]
Type=forking
Environment=JAVA_HOME=/opt/tool/java/jdk1.8.0_311
ExecStart=/opt/tool/kafka/kafka_2.12-2.6.2/bin/kafka-server-start.sh -daemon /opt/tool/kafka/kafka_2.12-2.6.2/config/server.properties
ExecStop=/opt/tool/kafka/kafka_2.12-2.6.2/bin//kafka-server-stop.sh
Restart=always
StartLimitInterval=60
StartLimitBurst=86400

[Install]
WantedBy=multi-user.target

使kafka.service立即生效执行命令:systemctl daemon-reload

e.kafka tool连接kafka

开放zookeeper端口:firewall-cmd --zone=public --add-port=2181/tcp --permanent

开放kafka端口:firewall-cmd --zone=public --add-port=2181/tcp --permanent

使增加的端口立即生效:systemctl daemon-reload

如果连接不成功,可以通过查看端口是否被用,来判断zookeeper及kafka是否启动成功:netstat -tunlp |grep 2181

netstat -tunlp |grep 9092


原文地址:https://blog.csdn.net/weixin_39794723/article/details/144353804

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!