自学内容网 自学内容网

项目 - 模拟实现消息队列

完整的代码在我的代码仓库:VampireMQ

1. 需求分析

本项目是基于SpringBoot、MyBatis以及SQLite,模拟实现消息队列,大部分功能是参考RabbitMQ来实现的

相关概念

消息队列有以下几个概念

1、生产者(Producer):生产者生产消息

2、消费者(Consumer):消费者消费消息

3、中间人(Broker):负责消息的存储和转发

4、发布(Publish):生产者把消息放到中间人上的过程叫发布

5、订阅(Subscribe):哪些消费者要从中间人这里取出消息,这个注册的过程叫订阅(并不是消费)

6、消费(Consume):消费者从中间人这里取出消息的动作

Broker内部涉及到的相关概念

1、虚拟主机(Virtual Host):类似于MySQL中的database,算是一个逻辑集合。

一个BrokerServer上可以组织多种不同种类的数据(消息),可以用VirtualHost做出逻辑区分。实际开发中,一个BrokerServer可能会同时管理多个业务线上的数据,可以用VirtualHost做出区分,每个VirtualHost分别放不同板块的数据

2、交换机(Exchange)

生产者把消息投递给BrokerServer实际上是先把消息交给BrokerServer上的某个交换机,接着再由交换机把消息转发给对应的队列

3、队列(Queue)

存储处理消息的实体,后续消费者也是从从队列中取出消息

4、绑定(Binding)

把交换机和队列之间建立关联关系,一个交换机可以对应多个队列,一个队列也能对应多个交换机。类似数据库中多对多关系,使用一个中间关联表来表示多对多关系,MQ中也是存在这样的中间表,而绑定就是中间表中的一条记录

5、消息(Message)

服务器A给B发送的请求可以是一个消息,服务器B给服务器A返回的响应也可以是一个消息。一个消息可以是一个字符串也可以是二进制数据,具体是什么样的,由程序员自定义。

在这里插入图片描述

核心api

消息队列服务器(BrokerServer)要提供的核心api,api名称及用法参考RabbitMQ。

1、创建队列(queueDeclare):不使用create术语,create只是单纯的创建,Declare的效果是不存在则创建,如果存在就啥也不干

2、销毁队列(queueDelete)

3、创建交换机(exchangeDeclare)

4、销毁交换机(exchangeDelete)

5、创建绑定(queueBind)

6、解除绑定(queueUnbind)

7、发布消息(basicPublish)

8、订阅消息(basicConsume)

MQ和消费者的工作模式:

1)Push(推):Broker把收到的数据主动发送给订阅的消费者(RabbitMQ只支持这种模式)

2)Pull(Pull):消费者主动调用Broker的api数据

所以我们没有设计消费消息的api

9、确认消息(basicAck):该api可以让消费者显示的告诉BrokerServer这个消息已经处理完毕(已读已回),从而提高系统的可靠性,保证消息处理没有遗漏。RabbitMQ也提供了否定确认,但是本次项目未实现

交换机类型

交换机转发消息有一套转发的规则,不同的规则对应不同的交换机类型

RabbitMQ实现了4种交换机类型(AMOP协议定义的)

1、Direct(直接交换机):生产者发送消息时,会指定一个“目标队列”的名字,交换机收到消息后会查看绑定的队列中是否有名字匹配的队列,如果有则将消息塞给对应的队列,如果没有则丢弃消息。

2、Fanout(扇出交换机):

在这里插入图片描述

3、Topic(主题交换机):

2个概念

1)bindingKey:队列和交换机绑定的时候指定一个单词(暗号)

2)routingKey:生产者发送的消息的时候也指定一个单词

如果当前routingKey和bindingKey能够对上暗号,则发送消息

在这里插入图片描述

4、Header:消息头交换机(规则复杂,应用场景少,不实现)

此次项目只实现1、2、3

持久化

虚拟主机、交换机、队列、绑定、消息,都需要持久化存储,我们采取的存储方式是:内存、硬盘都存储一份,内存为主,硬盘为辅。在内存中存储的原因:内存读取数据的效率高,保证能够高效转发消息。在硬盘中存储原因:防止内存中数据随着进程重启、主机重启导致的数据丢失。

网络通信

其他的生产者消费者(服务器)是通过网络与我们的BrokerServer进行交互的,此处我们使用TCP+自定义应用层协议来实现交互。自定义应用层协议的工作:让客户端可以通过网络,调用BrokerServer提供的api,api在客户端有一份,服务端也有一份,客户端远程调用api,发送请求给服务端,让服务端处理实际的业务。

在这里插入图片描述

远程方法调用:客户端调用了一个本地方法,这个方法背后给服务器给服务器发送了一系列的消息,由服务器完成了一系列工作,调用者并不知道背后的细节

在这里插入图片描述

除了上述9个方法,客户端还需要提供4个方法

1、建立Connection:一个Connection对象,代表一个TCP连接

2、关闭Connection

3、创建Channel:一个Connection中可包含多个Channel(通道),每个Channel上传输的数据是不相干的

4、关闭Channel

TCP建立、断开连接成本高,所以有了Channel的概念,每次要进行消息的传输就创建一个Channel,而不是创建TCP连接。

消息应答模式

应答模式分为2种

自动应答:消费者把消息取走了就算应答了

手动应答:消费者需要主动调用方法才算应答(basicAck方法就属于手动应答)

具体工作

1、对于生产者和消费者,编写客户端和服务器的网络通信部分,给客户端提供api,供客户端业务代码调用,实现远程调用。生产者的数据从哪来、消费者取到数据之后用来做什么,我们并不关心。

2、实现BrokerServer以及BrokerServer内部的基本概念(VirtualHost、Exchange、Queue、Binding、Message等)和核心api

3、对数据持久化存储

最终目标:能够给多个生产者消费者提供服务

模块划分

在这里插入图片描述

2. 设计核心类

交换机类型

采用枚举的方式表示交换机类型,0表示直接交换机,1表示扇出交换机,2表示主题交换机

在这里插入图片描述

交换机类

在这里插入图片描述

添加对应的get、set方法

绑定

没有持久化和自动删除属性的原因:binding是依附于Exchange和Queue的,对于持久化来说,如果 Exchange 和 Queue 任何一个都没有持久化,此时针对binding持久化是没有意义的,自动删除也是类似

在这里插入图片描述

消息队列MSGQueue

在这里插入图片描述

添加get、set方法

Message类

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消息的属性部分(BasicProperties)

因为需要存储到文件中,所以实现了Serializable接口,使之能够序列化和反序列化
在这里插入图片描述

3. 数据库设计

需要存储的内容有以下2个部分

1、数据库存储交换机、队列、绑定等

2、文件存储消息本身

因为消息不需要进行复杂的增删改查操作,所以数据库采用SQLite:轻量、简单

Java中使用SQLite,不需要额外安装,直接使用Maven依赖

<!--       sqlite依赖-->
<dependency>
    <groupId>org.xerial</groupId>
    <artifactId>sqlite-jdbc</artifactId>
    <version>3.41.0.1</version>
</dependency>

相关配置

#项目名称
spring.application.name=VampireMQ

# SQLite的url./data/meta.db表示在当前工作目录下的data目录下的meta.db
spring.datasource.url=jdbc:sqlite:./data/meta.db

# 数据库驱动
spring.datasource.driver-class-name=org.sqlite.JDBC

# MyBatis相关配置
mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl

mybatis.configuration.map-underscore-to-camel-case=true

# mybatis xml文件目录
mybatis.mapper-locations=classpath:/mapper/**Mapper.xml

编写持久层代码

本次项目采用的是MyBatis xml的方式进行数据库相关操作
在这里插入图片描述

@Mapper
public interface MetaMapper {
    //创建交换机表
    void createExchangeTable();

    //创建队列表
    void createQueueTable();

    //创建binding表
    void createBindingTable();

    //交换机插入删除
    void insertExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    //队列插入删除
    void insertQueue(MSGQueue queue);

    void deleteQueue(String queueName);

    //binding插入删除
    void insertBinding(Binding binding);

    void deleteBinding(Binding binding);

    //查找
    List<Exchange> selectAllExchanges();

    List<MSGQueue> selectAllQueues();

    List<Binding> selectAllBindings();


}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.wpre.vampiremq.mqserver.MetaMapper">

<!--    插入交换机-->
    <insert id="insertExchange" parameterType="com.wpre.vampiremq.mqserver.core.Exchange">
        insert into exchange
        values (#{name}, #{type},#{durable},#{autoDelete},#{arguments})
    </insert>

<!--    插入队列-->
    <insert id="insertQueue" parameterType="com.wpre.vampiremq.mqserver.core.MSGQueue">
        insert into queue
        values (#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});
    </insert>

<!--    插入binding-->
    <insert id="insertBinding" parameterType="com.wpre.vampiremq.mqserver.core.Binding">
        insert into binding
        values (#{exchangeName}, #{queueName}, #{bindingKey})
    </insert>


    <!--    创建exchange表-->
    <update id="createExchangeTable">
        create table if not exists exchange (
            name varchar(50) primary key,
            type int,
            durable boolean,
            auto_delete boolean,
            arguments varchar(1024)
        );
    </update>
    <!--    创建队列-->
    <update id="createQueueTable">
        create table if not exists queue (
            name varchar(50) primary key,
            durable boolean,
            exclusive boolean,
            auto_delete boolean,
            arguments varchar(1024)
            );
    </update>

    <update id="createBindingTable">
        create table if not exists binding (
            exchange_name varchar(50),
            queue_name varchar(50),
            binding_key varchar(50)
            );
    </update>


<!--    删除binding-->
    <delete id="deleteBinding" parameterType="com.wpre.vampiremq.mqserver.core.Binding">
        delete
        from binding
        where exchange_name = #{exchangeName}
          and queue_name = #{queueName}
    </delete>

<!--    删除交换机-->
    <delete id="deleteExchange" parameterType="java.lang.String">
        delete
        from exchange
        where name = #{exchangeName};
    </delete>

<!--    删除队列-->
    <delete id="deleteQueue" parameterType="java.lang.String">
        delete
        from queue
        where name = #{queueName}
    </delete>

<!--    查询所有的交换机-->
    <select id="selectAllExchanges" resultType="com.wpre.vampiremq.mqserver.core.Exchange">
        select * from exchange;
    </select>
<!--    查询所有的队列-->
    <select id="selectAllQueues" resultType="com.wpre.vampiremq.mqserver.core.MSGQueue">
        select * from queue;
    </select>
<!--    查询所有的绑定-->
    <select id="selectAllBindings" resultType="com.wpre.vampiremq.mqserver.core.Binding">
        select * from binding;
    </select>
</mapper>

创建DataBaseManager类,对数据库操作进行封装

数据库初始化:初始化之前,需要先从Spring中获取到MetaMapper,一般来说是使用Autowired注解获取,但是我们不想把DataBaseManager这个类交给Spring管理,所以在Application类中添加如下代码
在这里插入图片描述

//数据库初始化(建库建表,插入默认数据)
public void init() {
    //从Spring中获取到metaMapper
    metaMapper = VampireMqApplication.context.getBean(MetaMapper.class);
    if (!checkDBExists()) {
        //如果数据库不存在,进行建库建表操作
        //创建库: 对于SQLite来说,创建数据库就是创建一个文件
        File dataDir = new File("./data");
        dataDir.mkdirs();

        //创建表
        createTable();
        createDefaultData();
    } else {
        //数据库已经存在
        log.info("数据库已经存在!!!");
    }

}

//插入默认数据
private void createDefaultData() {
    //创建默认的交换机,RabbitMQ中有一个匿名的交换机,类型是DIRECT
    Exchange exchange = new Exchange();
    exchange.setName("");
    exchange.setType(ExchangeType.DIRECT);
    exchange.setDurable(true);
    exchange.setAutoDelete(false);
    metaMapper.insertExchange(exchange);
    log.info("创建初始数据成功!!!");
}

//建表操作,不需要创建数据库(meta.db),
private void createTable() {
    metaMapper.createExchangeTable();
    metaMapper.createQueueTable();
    metaMapper.createBindingTable();
    log.info("建表成功!!!");
}

public void deleteDB() {
    File file = new File("./data/meta.db");

    //删除文件
    boolean delete = file.delete();
    if (delete) {
        log.info("删除数据库成功");
    } else {
        log.error("删除数据库失败");
    }

    File dataDir = new File("./data");
    //删除目录
    boolean ret = dataDir.delete();
    if (ret) {
        log.info("删除目录成功");
    } else {
        log.error("删除目录失败");
    }
}

//检查数据库是否存在
private boolean checkDBExists() {
    File file = new File("./data/meta.db");
    return file.exists();
}

接着封装其他的操作
在这里插入图片描述

接着对代码进行单元测试

在这里插入图片描述

4. 消息持久化

消息并不涉及到复杂的增删改查,消息的数量可能会非常多,数据库的访问效率不是很高,所以考虑直接把消息存储到文件中。

消息的存储结构

在data目录中创建一些子目录,子目录名称就是队列的名称,每个队列的子目录下再分配2个文件:queue_data.txt、queue_stat.txt,queue_data.txt保存消息的内容,queue_stat.txt保存消息的统计信息
在这里插入图片描述

消息的存储

queue_data 是存储消息的文件,这个文件中存储了多个消息,每个消息都以二进制形式进行存储,结构如下
在这里插入图片描述

如果消息长度中的值为20,消息的二进制数据就占20,例如
在这里插入图片描述

消息的二进制数据如何存储?结构如下
在这里插入图片描述

basicProperties表示消息的属性部分,body才是消息的本体,而isValid表示消息是否有效。

关于offsetBeg和offsetEnd:

Message对象在内存中存储一份,硬盘上也存储一份,offsetBeg和offsetEnd不会在硬盘中记录,而是在内存中存储,只需要找到内存中的offsetBeg和offsetEnd,这样也能找到硬盘上对应的Message对象
在这里插入图片描述

消息的垃圾回收

对应BrokerServer来说,消息需要新增和删除,生产者生产消息BrokerServer就新增一个消息,消费者消费一个消息BrokerServer就删除一个消息。删除我们使用的是逻辑删除,isValid字段为1表示有效消息,为0表示无效消息(已经被删除),但是随着时间的推移,存储消息的文件可能越来越大,里面可能大部分是无效的消息,此时我们需要对文件进行垃圾回收。

设计思想:遍历原有的文件,把所有的有效的消息数据拷贝到一个新的文件中,再把旧的文件删除

触发垃圾回收的时机:当总消息数目超过1999,并且有效消息的数目低于总消息数目的30%

消息的总数目和有效消息的数目,从消息的统计信息的文件(queue_stat.txt)中获取,这个文件的内容只有一行,形式如下

消息的总数目\t有效消息的数目

例如

1999\t1000

消息文件的拆分与合并(未实现)

如果某个消息队列中消息非常多,而且全是有效消息,此时对这个文件的各种操作(比如进行一次GC)会非常耗时,

解决方案:

1、文件拆分:当单个文件长度达到某个阈值后,就会把大的文件拆分成若干个小文件

2、文件合并:每个文件都会单独进行GC,如果GC之后发现文件变小很多,就和相邻的文件进行合并

思路:

1、引入专门的数据结构存储当前队列有多少个文件,每个文件大小是多少,消息数目是多少,无效消息有多少

2、设计策略,何时触发文件拆分与合并

5. 消息文件的管理

定义MessageFileManager类,用于管理消息,在该类中创建一个内部类Stat,表示该队列的统计信息
在这里插入图片描述

接着在MessageFileManager类中创建如下方法

在这里插入图片描述

消息统计文件的读写

根据队列名称,读取到消息统计文件(也就是消息的总数和合法消息数)
在这里插入图片描述

根据队列名称和Stat对象,将统计信息写入文件
在这里插入图片描述

另外再编写createQueueFiles、deleteQueueFile、checkFilesExist,分别用于创建队列对应的文件和目录、删除队列对应的目录和文件、检查队列的目录和文件是否存在
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6. 消息序列化

序列化:把对象转换成字符串/字符数组,反序列化:把字符串/字符数组转换成对象

消息存储的byte[] body是二进制数据,不适合使用JSON进行序列化,所以本次我们使用二进制的序列化方式

Java标准库中的ObjectInputStream和ObjectOutputStream进行二进制序列化

实现序列化的前提:需要实现Serializable

创建一个BinaryTool类,在这个类中实现序列化和反序列化的方法

序列化:
在这里插入图片描述

反序列化:

在这里插入图片描述

7. 把消息写入文件

需要注意以下几点:

1)写入前,检查要写入的队列对应的文件是否存在

2)写入前,把消息序列化

3)写入前,获取当前队列的数据文件长度,从而计算出Message对象的offsetBeg和offsetEnd的值,接着设置这两个值

4)写入时,先写消息的长度,再写消息的本体,消息的长度是int类型,需要把这4个字节都写入到文件,因为OutputStream的write方法一次写入的是一个字节,所以我们使用DataOutputStream的writeInt方法

5)最后更新消息统计文件中的总消息数和有效消息数

6)如果多个线程同时对一个队列进行读写,可能会出现线程安全问题,所以需要对这个操作进行加锁,锁对象是队列本身

/**
 * 把新的消息,放到对应的队列中
 * @param queue 要写入的队列
 * @param message 要写的消息
 */
public void sendMessage(MSGQueue queue ,Message message) throws IOException {
    // 1.检查要写入的队列对应的文件是否存在
    if (!checkFilesExist(queue.getName())) {
        throw new MQException("要写入的队列文件不存在");
    }

    // 2.写入文件前,先把Message序列化

    byte[] messageBinary = BinaryTool.toBytes(message);
    synchronized (queue) {
        // 3.写入文件前,获取当前队列数据文件的长度,从而计算出Message对象的offsetBeg 和 offsetEnd
        //(只有在写文件的时候,才能知道offsetBeg 和 offsetEnd)
        // 新的 Message 数据写入到队列数据文件的末尾,此时的Message对象的offsetBeg 就是当前文件长度+4 ,offsetEnd就是当前文件长度+4+Message自身长度
        File queueDataFile = new File(getQueueDataPath(queue.getName()));
        //获取文件长度queueDataFile.length();
        //设置offsetBeg和offsetEnd
        message.setOffsetBeg(queueDataFile.length() + 4);//当前文件长度+4
        message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);//当前文件长度+4+Message自身长度

        // 4. 写入消息到数据文件中
        try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
            //参数设置true表示追加写入
            try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                // 1. 先写消息的长度(4个字节)
                //messageBinary.length 是一个int 类型, 需要把它的4个字节依次写入到文件中
                //使用OutputStream 的write方法其实写入的是byte类型的,所以我们选择DataOutputStream
                dataOutputStream.writeInt(messageBinary.length);
                // 2. 写消息本体
                dataOutputStream.write(messageBinary);

            }
        }

        // 5.更新消息统计文件
        Stat stat = readStat(queue.getName());
        stat.totalCount += 1;
        stat.validCount += 1;
        writeStat(queue.getName(), stat);
    }
}

8. 删除消息

这里的删除是指逻辑删除,只是把isValid设置为0

删除的前提是offsetBeg和offsetEnd是有效的,具体的步骤如下:

1、先把文件中要删除的这段数据读取出来,还原成Message对象

2、把isValid设置成0

3、把修改后的数据写回文件中

这里的读取,使用的是RandomAccessFile,支持随机读取

//删除消息,逻辑删除,把isValid设置为0即可
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {

    synchronized (queue) {
        try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {
            //先把文件中这段数据读取出来,还原成Message对象
            byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
            //移动光标
            randomAccessFile.seek(message.getOffsetBeg());
            //把数据读取到bufferSrc数组中
            randomAccessFile.read(bufferSrc);
            //转换成Message对象
            Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
            //把isValid设置为0即可
            diskMessage.setIsValid((byte) 0x0);

            //把数据重新写回文件
            byte[] bufferDest = BinaryTool.toBytes(diskMessage);

            //重新移动光标到offsetBeg(光标会随着读写进行变化)
            randomAccessFile.seek(message.getOffsetBeg());
            //更新后的消息写回文件
            randomAccessFile.write(bufferDest);
        }
        //更新统计文件中的有效消息个数
        Stat stat = readStat(queue.getName());
        if (stat.validCount > 0) {
            stat.validCount -= 1;
        }
        writeStat(queue.getName(), stat);
    }

}

9. 将消息从文件中加载到内存中

思路就是循环读取消息,把消息保存到链表中

/**
 * 从文件中读取出所有的消息,加载到内存中(保存在链表中),这个方法在程序启动时调用
 * 参数是queueName,而不是MSGQueue对象,是因为这个方法不需要加锁,不涉及多线程操作(因为是程序启动的时候调用的)
 *
 * @param queueName
 * @return LinkedList方便头插头删
 */
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, ClassNotFoundException {
    LinkedList<Message> messages = new LinkedList<>();
    try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {
        try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {
            long currentOffset = 0;//记录当前文件光标位置
            while (true) {
                //循环读取
                //1.读取当前消息的长度
                int messageLen = dataInputStream.readInt();//读取4个字节
                //readInt读到文件末尾会抛出异常,只需要捕获异常

                //2.根据这个长度,读取消息数据
                byte[] buffer = new byte[messageLen];
                int readLen = dataInputStream.read(buffer);
                if (readLen != messageLen) {
                    //如果实际读取的消息长度不等于当前消息长度
                    throw new MQException("loadAllMessageFromQueue 文件格式错误");
                }
                //3. 把读取的二进制数据转换成Message对象
                Message message = (Message) BinaryTool.fromBytes(buffer);

                //4.判断是否是无效数据
                if (message.getIsValid() != 0x1) {
                    //跳过无效数据
                    currentOffset += (4 + messageLen);//更新光标
                    continue;
                }
                //5. 设置Message对象的offset属性
                message.setOffsetBeg(currentOffset + 4);
                message.setOffsetEnd(currentOffset + 4 + messageLen);
                currentOffset += (4 + messageLen);//更新光标
                //6. 把有效数据加入到链表中
                messages.add(message);

            }

        } catch (EOFException e) {
            e.printStackTrace();
        }
    }
    return messages;

}

10. 实现垃圾消息回收

1、检查当前的队列是否需要进行垃圾回收(根据前面约定的触发垃圾回收机制的时机:当总消息数目超过1999,并且有效消息的数目低于总消息数目的30%)

2、实现思路:遍历原有的文件,把所有的有效的消息数据拷贝到一个新的文件中,再把旧的文件删除

//检查当前队列是否需要进行垃圾回收
public boolean checkGC(String queueName) {
    Stat stat = readStat(queueName);
    if (stat.totalCount > 1999 &&
            (double) stat.validCount / (double) stat.totalCount < 0.3) {
        return true;
    }
    return false;
}

//获取新的文件路径
private String getQueueDataNewPath(String queueName) {
    return getQueueDir(queueName) + "/queue_data_new.txt";
}

//垃圾回收
public void GC(MSGQueue queue) throws IOException, ClassNotFoundException {
    //把有效数据拷贝到新文件中,删除旧文件,再把新的文件重命名,更新消息统计文件
    synchronized (queue) {
        //创建新文件
        File newDataFile = new File(getQueueDataNewPath(queue.getName()));
        if (newDataFile.exists()) {
            //如果这个文件存在,说明上次GC出问题
            throw new MQException("GC出现问题");
        }
        boolean ok = newDataFile.createNewFile();
        if (!ok) {
            throw new IOException("GC时创建文件失败");
        }
        //2. 从旧的文件读取所有的有效消息对象
        LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());

        //3. 把有效消息写入新的文件中
        try (OutputStream outputStream = new FileOutputStream(newDataFile)) {
            try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                for (Message message : messages) {
                    byte[] bytes = BinaryTool.toBytes(message);
                    //先写4个字节的长度
                    dataOutputStream.writeInt(bytes.length);
                    dataOutputStream.write(bytes);
                }
            }

        }
        //4.删除旧文件,重命名新文件
        File oldDataFile = new File(getQueueDataPath(queue.getName()));
        boolean delete = oldDataFile.delete();
        if (!delete) {
            throw new MQException("删除旧数据文件失败");
        }
        //重命名
        boolean b = newDataFile.renameTo(oldDataFile);
        if (!b) {
            throw new MQException("重命名失败");
        }
        //更新消息统计文件
        Stat stat = readStat(queue.getName());
        //垃圾回收之后,全部都是有效消息
        stat.totalCount = messages.size();
        stat.validCount = messages.size();

    }

}

11. 封装数据库操作与数据文件操作

创建DiskDataCenter类,这个类主要是对之前写的数据库相关操作和数据文件的方法进行封装,给上层逻辑进行调用。

交换机操作以及初始化工作:

在这里插入图片描述

队列操作:

在这里插入图片描述

绑定操作:
在这里插入图片描述

消息操作:

在这里插入图片描述

12. 内存数据管理

用硬盘存储数据,只是为了保证数据部不丢失,而MQ主要的存储方式还是使用内存,因为内存读取速度更快。

存储的策略:

交换机:使用HashMap,key是交换机名字,value是Exchange对象

队列:使用HashMap,key是队列名,value是MSGQueue对象

绑定:使用嵌套的HashMap,key是绑定的名字,value是一个HashMap<队列名,绑定对象>

消息:使用HashMap,key是messageId,value是Message对象

队列和消息之间的关联:使用嵌套的HashMap,key是队列名,value是一个LinkedList<Message对象>

表示未被确认的消息:使用嵌套的HashMap,key是队列名,value是一个LinkedList<Message对象>

public class MemoryDataCenter {
    //存储交换机,key 是 exchangeName,value 是Exchange对象
    //可能会在多线程环境下使用这个类中的方法,所以使用ConcurrentHashMap,
    private ConcurrentHashMap<String, Exchange> exchangeMap
            = new ConcurrentHashMap<>();

    //存储队列,key是queueName,value是MSG对象
    private ConcurrentHashMap<String, MSGQueue> queueMap
            = new ConcurrentHashMap<>();

    //存储绑定,第一个key是exchangeName,第二个key是queueName
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindings
            = new ConcurrentHashMap<>();

    //存储消息,key是消息messageId,value是Message对象
    private ConcurrentHashMap<String, Message> messageMap
            = new ConcurrentHashMap<>();

    //存储队列和消息的关联关系,key是queueName,value是Message链表
    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap
            = new ConcurrentHashMap<>();

    //存储未被确认的消息
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitACKMap
            = new ConcurrentHashMap<>();

}

接着,在该类中添加下面方法

交换机相关操作:
在这里插入图片描述

队列相关操作:
在这里插入图片描述

绑定相关操作:

在这里插入图片描述

消息相关操作:
在这里插入图片描述

从队列中获取消息:

在这里插入图片描述

未确认消息相关操作:

在这里插入图片描述

最后添加一个recovery方法,用于从硬盘中读取数据,恢复到内存中

//从硬盘中读取数据,恢复到内存中
public void recovery(DiskDataCenter dataCenter) throws IOException, ClassNotFoundException {

    //先清空内存中原来的数据
    exchangeMap.clear();
    queueMap.clear();
    bindingsMap.clear();
    queueMessageMap.clear();
    messageMap.clear();
    //1.恢复交换机
    //查询所有的交换机
    List<Exchange> exchanges = dataCenter.selectAllExchanges();
    //循环插入
    for (Exchange exchange : exchanges) {
        exchangeMap.put(exchange.getName(), exchange);
    }

    //2.恢复队列
    List<MSGQueue> msgQueues = dataCenter.selectAllQueues();
    for (MSGQueue queue : msgQueues) {
        queueMap.put(queue.getName(), queue);
    }

    //3.恢复绑定
    List<Binding> bindings = dataCenter.selectAllBindings();
    for (Binding binding : bindings) {
        //
        ConcurrentHashMap<String, Binding> bindingConcurrentHashMap =
                bindingsMap.computeIfAbsent(binding.getExchangeName(),//先根据ExchangeName查出对应的Map,如果没有就创建出来
                        k -> new ConcurrentHashMap<>());
        //上述操作,会在bindingsMap中创建一个ConcurrentHashMap<String, Binding>,也就是bindingConcurrentHashMap
        //同时这个bindingConcurrentHashMap的key会被设置成exchangeName,所以后续只需要将value设置一下就好了
        //设置value,value也是一个ConcurrentHashMap
        bindingConcurrentHashMap.put(binding.getQueueName(), binding);
    }

    //4.恢复消息数据
    //遍历所有的队列,根据每个队列名,获取所有的消息,然后插入
    for (MSGQueue queue : msgQueues) {
        // 根据队列名加载出所有的消息
        LinkedList<Message> messages =
                dataCenter.loadAllMessageFromQueue(queue.getName());
        //把消息加入到queueMessageMap
        queueMessageMap.put(queue.getName(), messages);
        //再插入到messageMap中
        for (Message message : messages) {
            messageMap.put(message.getMessageId(), message);
        }
    }

    //未确认的消息不需要从硬盘中恢复,毕竟之前硬盘存储的设计时,没有考虑这一点
    //等待确认的过程中,一旦服务重启,未被确认的消息又会变成未被取走的状态

}

13. 虚拟主机设计

虚拟主机类似MySQL的database,将交换机、队列、绑定、消息进行逻辑上的区分

需求分析:

1、本次只实现单个虚拟主机

2、实现核心api供上层代码调用

核心api如下:

  • 创建队列(queueDeclare)

  • 销毁队列(queueDelete)

  • 创建交换机(exchangeDeclare)

  • 销毁交换机(exchangeDelete)

  • 创建绑定(queueBind)

  • 解除绑定(queueUnbind)

  • 发布消息(basicPublish)

  • 订阅消息(basicConsume)

  • 确认消息(basicAck)

3、把前面写过的内存和硬盘数据管理相关方法,与核心api进行串联

14. 虚拟主机的实现

首先创建VirtualHost类

并且创建如下方法

在这里插入图片描述

为了能够保证隔离性,我们约定在VirtualHost中的核心api里面,需要对exchangeName和queueName进行转换,具体的转换方案是:exchangeName = VirtualHostName+exchangeName ,queueName = VirtualHostName

创建队列(queueDeclare)

//创建队列
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) {
    queueName = virtualHostName + queueName;

    try {
        synchronized (queueLocker) {
            //从内存中查询,如果存在直接返回
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if (queue != null) {
                return true;
            }
            //如果不存在,就创建一个
            MSGQueue newQueue = new MSGQueue();
            newQueue.setName(queueName);
            newQueue.setDurable(durable);
            newQueue.setExclusive(exclusive);
            newQueue.setAutoDelete(autoDelete);
            newQueue.setArguments(arguments);

            //在硬盘上创建(如果需要持久化)
            if (durable) {
                diskDataCenter.insertQueue(newQueue);
            }
            //写入内存
            memoryDataCenter.insertQueue(newQueue);
            log.info("队列创建成功");
        }
        return true;
    } catch (Exception e) {

    }
    return false;
}

销毁队列(queueDelete)

//销毁队列
public boolean queueDelete(String queueName) {
    queueName = virtualHostName + queueName;
    try {
        synchronized (queueLocker) {
            //从内存中查询
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if (queue == null) {
                throw new MQException("要删除的队列不存在");
            }
            //硬盘中删除
            if (queue.isDurable()) {
                diskDataCenter.deleteQueue(queueName);
            }
            //从内存中删除
            memoryDataCenter.deleteQueue(queueName);
        }
        return true;

    } catch (Exception e) {
        return false;
    }
}

创建交换机(exchangeDeclare)

// 创建交换机(如果交换机不存在就创建,如果存在就直接返回,创建成功返回true,创建失败返回false)
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType,
                               boolean durable, boolean autoDelete, Map<String, Object> arguments) {
    //转换交换机名称
    exchangeName = virtualHostName + exchangeName;

    try {
        synchronized (exchangeLocker) {
            //1.判断该交换机是否存在,从内存中查询
            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
            if (exchange != null) {
                //已经存在
                return true;
            }
            //2.创建交换机
            Exchange newExchange = new Exchange();
            newExchange.setName(exchangeName);
            newExchange.setDurable(durable);
            newExchange.setType(exchangeType);
            newExchange.setAutoDelete(autoDelete);
            newExchange.setArguments(arguments);
            //3. 把交换机写入硬盘(前提是需要持久化)
            if (durable) {
                diskDataCenter.insertExchange(newExchange);
            }
            //4.把交换机写入内存
            memoryDataCenter.insertExchange(newExchange);
            log.info("交换机创建成功");
        }
        return true;
    } catch (Exception e) {
        return false;
    }
}

销毁交换机(exchangeDelete)

//销毁交换机
public boolean exchangeDelete(String exchangeName) {
    exchangeName = virtualHostName + exchangeName;

    try {
        synchronized (exchangeLocker) {
            //从内存中查询
            Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
            if (toDelete == null) {
                throw new MQException("交换机不存在");
            }
            //删除硬盘上的交换机
            if (toDelete.isDurable()) {
                diskDataCenter.deleteExchange(exchangeName);
            }
            //删除内存中的交换机
            memoryDataCenter.deleteExchange(exchangeName);
            log.info("交换机删除成功");
        }
        return true;
    } catch (Exception e) {

    }
    return false;
}

创建绑定(queueBind)

//创建绑定
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
    queueName = virtualHostName + queueName;
    exchangeName = virtualHostName + exchangeName;

    try {
        synchronized (queueLocker) {
            synchronized (exchangeLocker) {
                //1.判断绑定是否已经存在
                Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
                if (binding != null) {
                    //已经存在了
                    throw new MQException("binding已经存在!!!");
                }
                //2.判断bindingKey的合法性
                if (!router.checkBindingKey(bindingKey)) {
                    throw new MQException("bindingKey不合法");
                }
                //创建一个binding对象
                Binding newBinding = new Binding();
                newBinding.setExchangeName(exchangeName);
                newBinding.setQueueName(queueName);
                newBinding.setBindingKey(bindingKey);
                //获取对应的交换机和队列,如果对应的队列和交换机不存在,绑定是无法创建的
                Exchange exchange = memoryDataCenter.getExchange(exchangeName);
                MSGQueue queue = memoryDataCenter.getQueue(queueName);
                if (queue == null) {
                    throw new MQException("对应的队列不存在");
                }
                if (exchange == null) {
                    throw new MQException("对应的交换机不存在");
                }

                //插入硬盘中
                if (queue.isDurable() && exchange.isDurable()) {
                    diskDataCenter.insertBinding(newBinding);
                }
                //插入内存中
                memoryDataCenter.inertBinding(newBinding);
            }
        }
        return true;
    } catch (Exception e) {
        return false;
    }

}

解除绑定(queueUnbind)

//解除绑定
public boolean queueUnbind(String queueName, String exchangeName) {
    queueName = virtualHostName + queueName;
    exchangeName = virtualHostName + exchangeName;

    try {
        synchronized (queueLocker) {
            synchronized (exchangeLocker) {
                //从内存中查询,看是否存在
                Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
                if (binding == null) {
                    throw new MQException("删除绑定失败");
                }
                //直接删除:无论是不是需要持久化,都删除
                //从硬盘中删除
                memoryDataCenter.deleteBinding(binding);
                //从内存中删除
                memoryDataCenter.deleteBinding(binding);
            }
        }
        return true;
    } catch (Exception e) {
        return false;
    }

}

发布消息(basicPublish)

//发送消息到指定队列中(发布消息)
public boolean basicPublish(String exchangeName, String routingKey,
                            BasicProperties basicProperties, byte[] body) {
    try {
        //1.转换交换机名称
        exchangeName = virtualHostName + exchangeName;
        //2.校验routingKey
        if (!router.checkRoutingKey(routingKey)) {
            throw new MQException("RoutingKey不合法");
        }

        //查找交换机
        Exchange exchange = memoryDataCenter.getExchange(exchangeName);
        if (exchange == null) {
            throw new MQException("交换机不存在");
        }
        //判断交换机类型
        if (exchange.getType().equals(ExchangeType.DIRECT)) {
            //直接交换机
            //以routingKey作为队列名,直接把消息写入指定的队列中
            String queueName = virtualHostName + routingKey;//需要构造一下队列名

            //构造消息对象
            Message message = Message.createMessageWithId(routingKey, basicProperties, body);

            //查找队列对象
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if (queue == null) {
                throw new MQException("队列不存在");
            }
            //队列存在,直接往队列写入消息
            sendMessage(queue, message);

        } else {
            //FANOUT和TOPIC交换机
            //1.找到与该交换机关联的所有绑定
            ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exchangeName);
            for (Map.Entry<String, Binding> entry : bindings.entrySet()) {
                //遍历所有的绑定对象
                Binding binding = entry.getValue();
                //从内存中获取到对应的队列
                MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
                if (queue == null) {
                    continue;
                }

                //构造消息对象
                Message message = Message.createMessageWithId(routingKey, basicProperties, body);

                //判断这个消息是否能转发给队列,如果是FANOUT,所有的队列都要转发,如果是TOPIC,需要判断routingKey和BindingKey是否匹配
                if (!router.route(exchange.getType(), binding, message)) {
                    continue;
                }

                //真正转发消息给队列
                sendMessage(queue, message);

            }
        }
        return true;
    } catch (Exception e) {
        return false;
    }
}

//往队列中写入消息(转发)
private void sendMessage(MSGQueue queue, Message message) throws IOException {
    //发送消息,就是把消息写入硬盘和内存
    int deliverMode = message.getDeliverMode();
    // deliverMode为1表示不持久化,为2表示持久化
    if (deliverMode == 2) {
        //写入硬盘
        diskDataCenter.sendMessage(queue, message);
    }
    //写入内存
    memoryDataCenter.sendMessage(message, queue);
}

创建Router类,用于实现交换机的转发规则,并且验证bindingKey的合法性
在这里插入图片描述

交换机类型匹配规则如下

在这里插入图片描述

订阅消息(basicConsume)

在这里插入图片描述

Consumer是一个函数式接口(在别的语言中叫回调函数),后续调用basicConsume方法传参时就可以写lambda表达式

消费者推送消息的思路:

1、让broker server 把有哪些消费者管理好

2、收到对应消息,把消息推送给消费者

消费者调用basicConsume就是订阅某个指定队列的消息

所以,我们需要在MSGQueue中加入一个List类型的属性,存储消费者,为了描述消费者,我们再创建一个ConsumerEnv类
在这里插入图片描述

确认消息(basicAck)

在这里插入图片描述

消费消息的核心逻辑

让线程池执行对应消费者中的回调函数,另外添加一个扫描线程,用于让线程池知道要执行的是哪个回调函数以及是哪个队列的哪个消息

public class ConsumerManager {
    private VirtualHost virtualHost;//用于操作数据

    //线程池,用于执行具体的回调任务
    private ExecutorService wokePoll = Executors.newFixedThreadPool(4);

    //令牌队列,每次拿到一个令牌,才从对应的队列中取一个消息
    private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();

    //扫描线程
    private Thread scannerThread = null;

    public ConsumerManager(VirtualHost virtualHost) {
        this.virtualHost = virtualHost;
        scannerThread = new Thread(() -> {
            //扫描线程要完成的工作
            while (true) {
                try {
                    //1.拿到令牌
                    String queueName = tokenQueue.take();
                    //2.根据令牌找到队列
                    MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);
                    if (queue == null) {
                        throw new MQException("根据令牌取队列,发现队列不存在");
                    }
                    //3.从队列中消费消息
                    synchronized (queue) {
                        consumeMessage(queue);
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        scannerThread.setDaemon(true);//设置为后台线程
        scannerThread.start();
    }

    //通知 去进行消费
    public void notifyConsume(String queueName) throws InterruptedException {
        //把队列名加到令牌队列中
        tokenQueue.put(queueName);
    }
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        //1.找到对应的队列
        MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);
        if (queue == null) {
            throw new MQException("队列不存在");
        }
        //2.构造ConsumerEnv对象
        ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, consumer, queueName, autoAck);
        synchronized (queue) {
            queue.addConsumerEnv(consumerEnv);//添加一个消费者

            int messageCount = virtualHost.getMemoryDataCenter().getMessageCount(queueName);
            //查看有多少消息,如果有消息就先消费
            for (int i = 0; i < messageCount; i++) {
                //调用一次就消费一条消息
                consumeMessage(queue);
            }

        }
    }
    //消费消息
    private void consumeMessage(MSGQueue queue) {
        //1.按照轮询方式,找个消费者来,进行消费
        ConsumerEnv consumerEnv = queue.chooseConsumer();
        if (consumerEnv == null) {
            //当前队列没有消费者(订阅者)
            return;
        }
        //2.从队列中取出一个消息
        Message message = virtualHost.getMemoryDataCenter().pollMessage(queue.getName());
        if (message == null) {
            //当前队列没有消息
            return;
        }

        //3.把消息带人回调函数,让线程池执行
        wokePoll.submit(() -> {
            try {
                //为了确保消息不丢失,先把消息放入待确认消息的集合中
                virtualHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
                //执行回调操作
                consumerEnv.getConsumer().handleDelivery(consumerEnv.getConsumerTag(), message.getBasicProperties(), message.getBody());
                //如果是自动应答,直接删除消息
                //如果是手动应答,后续等消费者调用basicAck后删除
                if (consumerEnv.isAutoAck()) {
                    //删除硬盘消息
                    if (message.getDeliverMode() == 2) {
                        virtualHost.getDiskDataCenter().deleteMessage(queue, message);
                    }
                    //删除待确认集合消息
                    virtualHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    //删除内存中的消息
                    virtualHost.getMemoryDataCenter().removeMessage(message.getMessageId());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

15. 网络通信

基于TCP,自定义应用层协议,协议格式如下

在这里插入图片描述

type : 表示当前请求和响应要进行的操作(VirtualHost中的核心api),取值如下

  • 0x1 创建channel
  • 0x2 关闭channel
  • 0x3 创建exchange
  • 0x4 销毁exchange
  • 0x5 创建queue
  • 0x6 销毁queue
  • 0x7 创建binding
  • 0x8 销毁binding
  • 0x9 发送message
  • 0xa 订阅message
  • 0xb 返回ack
  • 0xc 服务器返回给客户端推送的消息

length:表示payload的长度

payload:根据当前是请求还是响应,以及当前的type进行取值,例如:type是0x3,同时是一个请求,此时payload中的内容就是exchangeDeclare的参数序列化的结果;type是0x3,同时是一个响应,此时payload中的内容就是exchangeDeclare的返回结果的序列化内容

创建BrokerServer类,创建如下的方法
在这里插入图片描述

16. 客户端

3个核心类

1、ConnectionFactory:连接工厂,主要功能是创建出连接Connection对象
在这里插入图片描述

2、Connection:表示一个TCP连接
在这里插入图片描述

3、Channel:表示一个逻辑上的连接

在这里插入图片描述

创建如下方法

在这里插入图片描述

17. 收尾工作

编写demo程序
生产者类

public class DemoProducer {
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("启动生产者");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建交换机和队列
        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);

        // 创建一个消息并发送
        byte[] body = "hello".getBytes();
        boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
        System.out.println("消息投递完成! ok=" + ok);

        Thread.sleep(500);
        channel.close();
        connection.close();
    }
}

消费者类

public class DemoConsumer {
    public static void main(String[] args) throws IOException, MQException, InterruptedException {
        System.out.println("启动消费者!");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);

        channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MQException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                String bodyString = new String(body, 0, body.length);
                System.out.println("body=" + bodyString);
                System.out.println("[消费数据] 结束!");
            }
        });

        // 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.
        while (true) {
            Thread.sleep(500);
        }

    }
}

在启动类中,调用BrokerServer中的start方法
在这里插入图片描述
运行程序
在这里插入图片描述
启动生产者类,生产消息
在这里插入图片描述
接着启动消费者类,测试消费消息
在这里插入图片描述
OK~ 非常的完美~


原文地址:https://blog.csdn.net/QUIXOTIC_/article/details/145228033

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!