自学内容网 自学内容网

[RocketMQ 5.3.1] Win11 + Docker Desktop 本地部署全流程 + 踩坑记录

时间比较仓促,部署Linux的过程我就简写啦。

0. 我的系统版本(供参考)

Windows 11 专业版 24H2, Build 26100.2161

Experience: Windows Feature Experience Pack 1000.26100.32.0

JDK: OpenJDK Amazon Corretto 21.0.4.7.1

1. 在WSL2上安装Ubuntu 24.04

我完全参照了这个安装,流程上没啥问题:Install Ubuntu on WSL2 - Ubuntu WSL documentation

遇到的坑

(1)安装之后需要以管理员身份打开。进入到命令行才算是安装完成。

(2)打开Ubuntu 24.04后无法进入命令行,显示WslRegisterDistribution failed with error: 0x80370114 后面跟着一堆问号或者啥

解决方法:控制面板→启用或关闭Windows功能→勾选 Hyper-V和Virtual Machine Platform,最好不要勾选Telnet→确定后重启电脑。

2. 安装Docker Desktop:Windows | Docker Docs

不需要付费,也不用登录,直接打开就行。遇到的坑:

(1)无法打开Docker Desktop,显示 An unexpected error was encountered while executing a WSL command…

解决方法:确认已安装的Linux子系统来自Ubuntu Distribution,并设置为默认。详见:https://stackoverflow.com/questions/76160943/docker-desktop-an-unexpected-error-was-encountered-while-executing-a-wsl-comma

3. 从Docker安装RocketMQ 5.3.1

在标题栏上搜索“rocketmq”,然后直接 pull 即可。

4. 创建一个共享网络,并启动一个命名服务。

以管理员身份打开Powershell,然后执行

docker network create rocketmq

docker run -d --name rmqnamesrv -p 9876:9876 --network rocketmq apache/rocketmq:5.3.1 sh mqnamesrv

现在我们占用了 9876 这个端口。还可以顺便用这个命令检查一下是否正常启动了:

docker logs -f rmqnamesrv

当然,从Docker Desktop也能看:

5. 启用Broker和Proxy

官方步骤在这个:Run RocketMQ in Docker | RocketMQ

但是我的步骤和官网不太一样,可以先试试我的步骤

(1)随便找个地方新建个记事本,然后重命名为broker.conf,空文件就行,啥也不用写。我建在了 D:\broker.conf

(2)在Powershell中执行

docker run -d `
  --name rmqbroker `
  --network rocketmq `
  -p 10912:10912 -p 10911:10911 -p 10909:10909 `
  -p 8080:8080 -p 8081:8081 `
  -e "NAMESRV_ADDR=rmqnamesrv:9876" `
  -v /D/broker.conf:/home/rocketmq/rocketmq-5.3.1/conf/broker.conf `
  apache/rocketmq:5.3.1 sh mqbroker --enable-proxy `
  -c /home/rocketmq/rocketmq-5.3.1/conf/broker.conf

注意:-v后面推荐使用Unix风格的绝对路径,比如我的写成 /D/broker.conf,冒号后面的部分不用改。

10909、10911、10912三个端口都应该做映射(如命令所示)。现在我们能在容器列表看到:

6. 点进 rmqbroker,确认一下 broker.conf文件真实存在

依次展开:/home/rocketmq/rocketmq-5.3.1/conf/,看有没有 broker.conf 文件

7. 回到 Exec,执行以下命令,然后重启容器

cd /home/rocketmq/rocketmq-5.3.1/conf/

echo "brokerIP1=127.0.0.1" > broker.conf

重启容器:点击右上角那个圈圈箭头

8. 如果你想,可以手动建立一个Topic,也可以不建

这次我们在 Powershell 里进入容器,然后执行新建命令,就叫 TopicTest 吧,默认集群即可

docker exec -it rmqbroker sh

sh mqadmin updateTopic -n rmqnamesrv:9876 -t TopicTest

9. Java 代码验证一下

(0)POM

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-common -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-common</artifactId>
    <version>5.3.1</version>
</dependency>

(1)生产者

package com.example.demo.rmq;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

@SuppressWarnings("preview")
public class Producer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
        producer.send(msg, 10000);
        System.out.println(STR."Message sent: \{new String(msg.getBody())}");

        producer.shutdown();
    }
}

(2)消费者

package com.example.demo.rmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

@SuppressWarnings("preview")
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.println(STR."Received message: \{new String(msg.getBody())}");
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.start();
    }
}

(3)删除 Topic

package com.example.demo.rmq;

import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.concurrent.ExecutionException;

@SuppressWarnings("preview")
public class DeleteTopics {
    public static void main(String[] args) throws ExecutionException, InterruptedException, RemotingException, MQClientException {
        final String topicToDelete = "TopicTest";
        final String nameServerAddr = "localhost:9876";
        final String brokerAddr = "localhost:10911";

        ClientConfig config = new ClientConfig();
        config.setNamesrvAddr(nameServerAddr);

        var admin = MQClientManager.getInstance().getOrCreateMQClientInstance(config).getMQClientAPIImpl();
        admin.start();
        System.out.println(STR."Before deletion, topics available: \{admin.getTopicListFromNameServer(3000).getTopicList()}");

        // Step 1: Delete topic from NameServer
        admin.deleteTopicInNameServer(nameServerAddr, topicToDelete, 3000);

        // Step 2: Delete topic from Broker
        admin.deleteTopicInBroker(brokerAddr, topicToDelete, 3000);

        System.out.println(STR."After deletion, topics available: \{admin.getTopicListFromNameServer(3000).getTopicList()}");
    }
}

运行结果:

Before deletion, topics available: [RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TBW102, %RETRY%ConsumerGroup, rmq_sys_REVIVE_LOG_DefaultCluster, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, DefaultCluster_REPLY_TOPIC, rmq_sys_wheel_timer, 6d472f2c2a3d, rmq_sys_SYNC_BROKER_MEMBER_6d472f2c2a3d, RMQ_SYS_TRANS_OP_HALF_TOPIC, TopicTest]


After deletion, topics available: [RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TBW102, %RETRY%ConsumerGroup, rmq_sys_REVIVE_LOG_DefaultCluster, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, DefaultCluster_REPLY_TOPIC, rmq_sys_wheel_timer, 6d472f2c2a3d, rmq_sys_SYNC_BROKER_MEMBER_6d472f2c2a3d, RMQ_SYS_TRANS_OP_HALF_TOPIC]

可以发现末尾的 TopicTest 已经删掉了。

That's it.

---

后记:在最新版本的 RocketMQ 中,已经没有 DefaultMQAdminExt 等常用的 Admin 类了,都需要通过 ClientConfig 构建 MQClient,不知道为何官方文档迟迟没有更新。另外,已经不需要 DeleteTopicRequestHeader 等麻烦的类了。

其他内容欢迎大家补充。


原文地址:https://blog.csdn.net/as6757uyy65uy75/article/details/143438131

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!