自学内容网 自学内容网

windows下安装rabbitMQ并开通管理界面和允许远程访问

如题,在windows下安装一个rabbitMQ server;然后用浏览器访问其管理界面;由于rabbitMQ的默认账号guest默认只能本机访问,因此需要设置允许其他机器远程访问。这跟mysql的思路很像,默认只能本地访问,要远程访问需要另外设置,并且应该是新增一个账号来支持远程。这种做法一下子看上去很奇怪,数据库、消息队列天然就是要大家共同使用的,只能本机使用有什么意义?但细想好像又没有什么毛病,虽然有点不方便,但安全意识是有了。

以下是我初次在项目中使用rabbitMQ的一点记录。

一、安装

在windows下,当然就是下载windows的安装包。但rabbitMQ依赖一种叫ErLang的东东,安装时会先检查。如果没有的话,还要去下载erlang。这破东西100多兆,比rabbitMQ的安装包大多了。喧宾夺主。
在这里插入图片描述
先安装erlang(就是这个otp_win64_**.exe),然后再安装rabbitmq。

二、开通管理界面

安装好rabbitmq之后,会自动在windows里创建一个服务。

安装过程中,可知rabbitMQ有两个默认端口:5672和15672。5672用于编码,15672用于管理界面。

在这里插入图片描述
但是rabbitmq也并不默认打开这个管理界面,需要额外设置:

1、打开RabbitMQ的安装路径的sbin目录,

比如
在这里插入图片描述

2、键入cmd,打开命令窗口

3、输入命令:

rabbitmq-plugins.bat enable rabbitmq_management

即可用浏览器访问管理界面。如前所示。

三、允许远程访问

至此rabbitMQ只能本机访问,比如用账号guest/guest。设置允许远程访问步骤如下:

1、创建一个新账号

当然也可以设置guest允许远程访问,但这不符合安全思想。
在这里插入图片描述

2、给新账号赋权限

1)点击新建的账号
在这里插入图片描述
2)这2个按钮都点一下
在这里插入图片描述
3)有权限了
在这里插入图片描述

3、重启rabbitMQ服务

四、java写入示例

如果每次访问rabbitMQ,都需要连接一次,开销太大,因此使用连接池,每次用完放回池中,用于下次再用。

1、rabbitMQ连接池

<!--rabbitMQ-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.17.0</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
</dependency>
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;

/**
 * 连接池
 * 提高性能,不必每次发送消息都构建连接
 */
@Component
public class RabbitMQConnectionPool {
    private static ObjectPool<Connection> pool;

    @Value(value = "${rabbitmq.host:localhost}")
    private String host;
    @Value(value = "${rabbitmq.port:5672}")
    private int port;
    @Value(value = "${rabbitmq.username:guest}")
    private String username;
    @Value(value = "${rabbitmq.password:guest}")
    private String password;

    public RabbitMQConnectionPool() {
        initializePool();
    }

    public Connection getConnection() {
        try {
            return pool.borrowObject();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    //返回连接到连接池
    public void returnConnection(Connection connection) {
        if (connection != null) {
            try {
                pool.returnObject(connection); // 直接返回连接
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @PostConstruct
    private void initializePool() {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(host);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setConnectionTimeout(30000); // 设置连接超时
            factory.setRequestedHeartbeat(60); // 设置心跳

            GenericObjectPoolConfig<Connection> config = new GenericObjectPoolConfig<>();
            config.setMaxTotal(10); // 设置最大连接数
            config.setMinIdle(2);   // 设置最小空闲连接数
            config.setBlockWhenExhausted(true); // 允许在连接池耗尽时等待
            config.setMaxWait(Duration.ofMillis(10000)); // 设置最大等待时间

            pool = new GenericObjectPool<>(new BasePooledObjectFactory<Connection>() {
                @Override
                public Connection create() throws Exception {
                    return factory.newConnection();
                }

                @Override
                public void destroyObject(PooledObject<Connection> pooledObject) throws Exception {
                    Connection conn = pooledObject.getObject();
                    if (conn != null) {
                        conn.close();
                    }
                }

                @Override
                public boolean validateObject(PooledObject<Connection> pooledObject) {
                    Connection conn = pooledObject.getObject();
                    return conn != null && conn.isOpen();
                }

                @Override
                public PooledObject<Connection> wrap(Connection conn) {
                    return new DefaultPooledObject<>(conn);
                }
            },config);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、发送处理器

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 发送类,外部可调用其中的发送方法
 */
@Component
public class RabbitMQSender {

    @Autowired
    private RabbitMQConnectionPool connectionPool;

    private final int MaxRetries = 5; // 最大重试次数

    public boolean sendMessage(String queueName, String message){
        return sendMessage(queueName,message,null);
    }
    /**
     * category:业务类型
     *
     * 发送时如果连接失败,自动重连,直至成功或重连次数超标
     */
    public boolean sendMessage(String queueName, String message,String category) {
        boolean ok = true;

        int attempt = 0;

        while (attempt < MaxRetries) {
            /**
             * 定义:Channel 是在一个 Connection 上创建的虚拟连接。
             * 作用:通道用于实际的消息传递操作,包括发送和接收消息、声明队列、交换机等。
             * 连接是底层的 TCP 连接,而通道是基于连接的轻量级虚拟连接,用于处理具体的消息传递操作。
             * 使用连接池来复用 Connection,同时为每个操作创建和关闭 Channel,可以提高性能和资源利用率。
             */
            Connection connection = null;
            Channel channel = null;
            try {
                connection = connectionPool.getConnection();
                if (connection == null) {
                    System.out.println("Failed to get connection, retrying...");
                    attempt++;
                    Thread.sleep(1000); // 等待一段时间后重试
                    continue;
                }

                channel = connection.createChannel();
                channel.queueDeclare(queueName, false, false, false, null);
                channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
                System.out.println(String.format(" [%s] Sent to '%s',length:%d", category != null ? category : "x",
                        queueName, message.length()));
                break; // 发送成功后退出循环
            } catch (Exception e) {
                attempt++;
                System.out.println("An error occurred, retrying...");
                e.printStackTrace();
            } finally {
                // 确保通道和连接在这里被关闭
                try {
                    if (channel != null) {
                        channel.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if (connection != null) {
                    // 返回连接到连接池,而不是关闭它
                    connectionPool.returnConnection(connection);
                }
            }
        }

        if (attempt >= MaxRetries) {
            ok = false;
            System.out.println("Failed to send message after " + MaxRetries + " attempts.");
        }

        return ok;
    }
}

3、调用示例

@Autowired
private RabbitMQSender rabbitMQSender;

if (!rabbitMQSender.sendMessage(QueueName, jsonStr, "测试信息")) {
    System.err.println("发送测试信息失败");
}

参考文章
Windows下开启rabbitMQ的图形界面
【RabbitMQ】超详细Windows系统下RabbitMQ的安装配置


原文地址:https://blog.csdn.net/leftfist/article/details/142586651

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!