自学内容网 自学内容网

项目第五弹:队列消息管理模块

项目第五弹:队列消息管理模块

首先要说明的是:
我们的基础版本的项目是只实现了消息推送的功能,暂不支持消息拉取。
消息拉取功能,我们留到扩展版本在实现

一、消息如何组织并管理

消息是依附于队列而存在的,因此我们的消息管理模块是以队列为单位进行管理的

消息也要持久化,不过它只是为了备份,无需查询操作,而新增和删除的操作较为频繁,且消息数量较多,因此我们不将消息存到数据库当中,而是存到普通二进制文件当中

1.消息结构体

我们的消息结构体之前在proto文件当中已经编写完毕:

// 3. 消息的基本属性
message BasicProperities
{
    string msg_id = 1;
    DeliveryMode mode = 2;
    string routing_key = 3;
}


// 4. 消息结构体
//为了便于管理消息:
//      1. 有效载荷(持久化在文件当中的)
//              属性
//              消息内容
//      2. 管理字段
//              是否有效
//              偏移量
//              消息长度
message Message 
{
    message ValidLoad
    {
        string body = 1;
        BasicProperities properities = 2;
        string valid = 3;// 因为bool的true/false在protobuf当中持久化后的长度不同,因此我们不用bool,而是用"0"代表无效,"1"代表有效
    }
    ValidLoad valid = 1;
    uint64 offset = 2;
    uint64 len = 3;  
}

2.消息持久化管理模块设计

1.数据消息文件名

因为消息是以队列为单位进行管理的,所以我们想要将队列名和消息文件名产生关联(方便管理)

所以我们规定:

消息文件名 = 队列名.data

2.临时消息文件名

我们之前说过,我们往文件当中写入的是消息的有效载荷:

message ValidLoad
{
    string body = 1;
    BasicProperities properities = 2;
    string valid = 3;// 因为bool的true/false在protobuf当中持久化后的长度不同,因此我们不用bool,而是用"0"代表无效,"1"代表有效
}

其中valid字段用于我们消息的伪删除。
如果只采用伪删除:消息会越来越多,消息文件会越来越大

首先我们想到的是:
能不能就像闭散列哈希表那样,把DELETE的位置占掉,放上待插入数据
这个方法在我们文件版本伪删除法当中不现实,
因为每个消息的长度大概率都不一样,覆盖式写入会产生空隙位置,会使得文件读写产生隐患,不符合代码健壮性的原则
而且这么来写,效率太低,代码也较为复杂

那怎么办呢?
我们可以借助Java当中垃圾回收器(Garbage Collection)的思想
设置一个水位线(标准):当消息文件中消息总数超过2000,且无效消息超过一半时,进行垃圾回收

进行垃圾回收时其实就是依次读取数据消息文件当中的有效消息,并写入临时文件当中,然后把消息文件删除,将临时文件重命名为数据消息文件名

临时文件名:队列名.temp

3.对外接口与包含成员

对外接口:

  • 创建/删除消息文件
  • 新增消息
  • 删除消息
  • 垃圾回收(消息恢复)

包含成员:

  • 数据消息文件名
  • 临时消息文件名
  • 队列名

目前就只有一个问题了:文件是面向字节流的,因此会有数据包粘包问题,所以我们要自定义应用层协议来解决文件读写的粘包问题:

二、自定义应用层协议解决文件读写的粘包问题

1.Length-Value协议

8字节的消息长度+消息本身

我们的消息长度就用我们常用的size_t类型了,32位平台下是4字节
64位平台下是8字节

我们用的Linux版本大多都是编程默认就是在64位平台的
在这里插入图片描述

三、队列消息管理模块设计

1.待确认消息哈希表

我们要解决的问题是:

我们的消息在内存当中采用什么数据结构来存储?

这就要对消息进行分类了:
因为我们的消息队列是支持消息推送与消息确认的,因此消息就分为:

  1. 待推送消息(推送消息时使用)
  2. 待确认消息(确认消息时使用)

又因为我们需要随时检查并在符合条件的情况下进行gc,而gc时会改变持久化消息在文件当中的偏移量

因此gc之后,内存级的待确认消息当中的消息体的偏移量就“野”了,变成了“野”偏移量了

因此我们就要在gc的时候随时更新待确认消息当中的偏移量

因此我们就要能够根据某个字段来快速找到对应的消息结构体
所以待确认消息要用哈希表来存储 <消息ID,消息结构体::ptr>

using MessagePtr=std::shared_ptr<Message>;
std::unordered_map<std::string,MessagePtr> waitack_map;

2.待推送消息链表

对于待推送消息,我们选择list,而不是vector,queue,unordered_map

1.为何不选哈希表

因为我们的待推送消息链表无需进行基于key值的随机访问,所以用哈希表的意义不大

2.vector与list

  1. 因为我们作为消息队列,要确保消息推送时的有序性,因此没有随机访问需求,vector最大的优势发挥不出来
  2. 因为消息推送是有序的:需要进行“头删尾插”或者“头插尾删”,也就是在容器两端进行操作,而vector的头插和头删虽说可以利用insert和erase来完成,但是效率很低,而list效率较高

因此在vector和list当中,我们选择list

3.queue和list

  1. 因为我们在服务重启之后,进行gc时,要把待推送消息链表当中的数据都插入到持久化消息哈希表当中,因此需要遍历待推送消息
  2. 而queue是不支持在不删除元素的情况下进行遍历的,尽管其两端操作效率更高【因为底层是deque】,但是没法遍历,没办法,只能选list

因此在queue和list当中,我们选择list

4.deque和list

deque的两端操作效率比list高【因为内存分配快,缓存命中率高】,遍历效率也高于list【缓存命中率高】

但是:
它们性能上的差异并不大,而且list更加灵活,我们的项目较为复杂,采用list能有更好的扩展性,当然,如果大家想用deque也可以的

std::list<MessagePtr> waitpush_list;

3. 持久化消息哈希表

服务重启时,我们需要读取持久化消息,此时消息文件当中的数据有些是无效的,既然都要读取,那何不顺便gc一下呢?

而gc之后加载到内存当中消息放在哪里呢?

  1. 待推送消息链表??
    可以这么搞,这样的话持久化消息可以被扩展为能够被消费者主动拉取的消息,但是消息确认时,它不太适合快速查找来删除该持久化消息
  2. 待确认哈希表?
    不合适,因为消息都还没有被推送,那何谈待确认,不符合这个哈希表的任务,尽管实现上可以,但是代码不优雅

因此我们就再搞一个持久化消息哈希表<消息ID,消息结构体::ptr>
有了它之后,gc的时候直接改这个持久化消息哈希表即可,而无需改待确认消息哈希表了,因为各司其职,见名知义,代码更优雅

而且value都是智能指针,只要其资源被改了,那么所有的智能指针所访问到的也就都是修改后的新版本

std::unordered_map<std::string,MessagePtr> durable_map;

4.对外接口与包含成员

对外接口:

  • 发布消息(增)
  • 确认消息(删)
  • 垃圾回收(消息恢复)
  • 获取队首消息(进行消息推送)
  • 销毁该队列所有消息(删除队列时要用)

包含成员:

  • 待确认消息哈希表
  • 待推送消息哈希表
  • 持久化消息哈希表
  • 持久化消息总数
  • 持久化有效消息总数
  • 互斥锁
  • 队列名
  • 消息持久化管理模块句柄

因为我们的这个队列消息管理模块可能会同时被多线程访问,因此需要加上互斥锁保证线程安全

四、总体消息管理模块设计

其实就是把队列消息管理模块组织一下,存到一个哈希表当中而已

对外接口:

  • 初始化队列消息管理结构
  • 销毁队列消息管理结构
  • 发布消息
  • 确认消息
  • 获取队首消息

包含成员:

  • 互斥锁
  • unordered_map<队列名,队列消息管理模块::ptr>
  • 消息文件所在目录

五、消息持久化管理模块实现

1.构造函数

构造函数其实就是初始化:

  1. _datafile
  2. _tmpfile
    然后创建消息文件目录,最后创建这两个文件
MessageMapper(const std::string &basedir, const std::string &qname)
{
    if (!FileHelper::createDir(basedir))
    {
    default_fatal("消息持久化管理模块句柄初始化失败,因为创建消息文件目录失败, 目录名: %s",basedir.c_str());
        abort();
    }
    std::string dir = basedir;
    if (dir.back() != '/')
        dir.push_back('/');
    _datafile = dir + qname + data_suffix;
    _tmpfile = dir + qname + tmp_suffix;
    if (!createFile())
    {
    default_fatal("消息持久化管理模块句柄初始化失败,因为创建数据和临时文件失败, 数据文件名: %s , 临时文件名: %s",_datafile.c_str(),_tmpfile.c_str());
        abort();
    }
}

2.删除文件

注意: 析构函数直接用编译器默认生成的即可,而无需调用removeFile
否则就是画蛇添足,好不容易持久化好了,析构的时候又删除了,典型的画蛇添足

bool removeFile()
{
    if (!FileHelper::removeFile(_datafile))
    {
    default_fatal("删除数据文件失败, 数据文件名: %s",_datafile.c_str());
        return false;
    }
    if (!FileHelper::removeFile(_tmpfile))
    {
    default_fatal("删除临时文件失败, 临时文件名: %s",_datafile.c_str());
        return false;
    }
    return true;
}

3.插入消息

因为gc的时候我们要读取数据文件,将其中的有效信息写到临时文件当中

也就是说我们既有向数据文件当中写入消息的需求,也有向临时文件当中写入消息的需求

因此我们单拎出一个函数来,说白了,函数签名就是这个:

bool _insert(const std::string &filename, MessagePtr &mp);

下面我们就要想:插入一个消息一共分为几步:

  1. 把mp当中的有效载荷序列化拿到一个string load
  2. 拿到文件总大小(filesz),拿到load的长度len(类型:size_t)
  3. 把长度写入文件,大小是sizeof(size_t)
  4. 把load写入文件
  5. 设置mp的offset和len这两个字段
    【注意】:消息的offset指向的是有效载荷的起始位置,而不是消息长度的起始位置
    在这里插入图片描述
    就想把大象放到冰箱一样,一步一步来就行

1.有效载荷序列化

// 1.序列化有效载荷,拿到消息长度
std::string load = mp->valid().SerializeAsString();

2.拿到文件大小和有效载荷长度

// 2.拿到文件大小(偏移量)和有效载荷长度
size_t offset = FileHelper::size(filename);
size_t len = load.size();

3.把len写入文件,写sizeof(size_t)个字节

这是我们的write函数,第二个参数类型是const char*
而不要搞成string类型,因为我们还要写入长度(size_t)类型,主要是要保证其所占字节数恒为sizeof(size_t)啊

static bool write(const std::string &filename,const char* str, size_t offset, size_t len);
// 3.把len写入文件,写sizeof(size_t)个字节
if (!FileHelper::write(filename, reinterpret_cast<const char *>(&len), offset, sizeof(size_t)))
{
default_fatal("消息的长度写入文件失败, 消息ID: %s, 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());
    return false;
}

4.把有效载荷写入文件

static bool write(const std::string &filename, const std::string &str);
// 4.把有效载荷写入文件
if (!FileHelper::write(filename, load))
{
default_fatal("消息的有效载荷写入文件失败, 消息ID:  %s, 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());
    return false;
}

5.设置mp的offset和len这两个字段

mp->set_len(len);
mp->set_offset(offset + sizeof(size_t)); // 注意:偏移量是有效载荷的起始位置,而不是长度的起始位置!!

6._insert完整代码

bool _insert(const std::string &filename, MessagePtr &mp)
{
    // 1.序列化有效载荷,拿到消息长度
    std::string load = mp->valid().SerializeAsString();
    size_t len = load.size();

    // 2.拿到文件大小(偏移量)
    size_t offset = FileHelper::size(filename);

    // 3.把len写入文件,写sizeof(size_t)个字节
    if (!FileHelper::write(filename, reinterpret_cast<const char *>(&len), offset, sizeof(size_t)))
    {
default_fatal("消息的长度写入文件失败, 消息ID: %s, 文件名: ",mp->valid().properities().msg_id().c_str(),filename.c_str());
        return false;
    }

    // 4.把有效载荷写入文件
    if (!FileHelper::write(filename, load))
    {
default_fatal("消息的有效载荷写入文件失败, 消息ID: %s, 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());
        return false;
    }
    // 5.设置mp的offset和len这两个字段
    mp->set_len(len);
    mp->set_offset(offset + sizeof(size_t)); // 注意:偏移量是有效载荷的起始位置,而不是长度的起始位置!!
    return true;
}

4.删除消息

因为我们采用的是伪删除,所以删除消息时要给我【消息持久化管理模块】这个MessagePtr
bool erase(MessagePtr &mp);

同样的,步骤是:

  1. 将有效标记位置为无效
  2. 将有效载荷序列化为 string data
  3. 拿到偏移量并检查长度是否相等
  4. 将修改后的mp写到offset位置,写len个长度

1.将有效标记位置为无效

// 1. 将有效标志置为无效
mp->mutable_valid()->set_valid("0");

2.序列化

// 2. 序列化
std::string data = mp->valid().SerializeAsString();

3.拿到偏移量并检查长度是否相等

// 3. 拿到偏移量和检查长度是否相等
size_t offset = mp->offset();
if (mp->len() != data.size())
{
    default_info("删除持久化数据失败,因为修改后的有效载荷跟文件当中的数据长度不同,len:%d ,mp->len():%d",data.size(),mp->len());
    return false;
}

4.将修改后的mp写到offset位置,写len个长度

// 4. 写入数据文件
if (!FileHelper::write(_datafile, data.c_str(), offset, mp->len()))
{
    default_info("删除持久化数据失败,消息ID:%s , 文件名:%s",mp->valid().properities().msg_id().c_str(),_datafile.c_str());
    return false;
}

5.erase完整代码

bool erase(MessagePtr &mp)
{
    // 1. 将有效标志置为无效
    mp->mutable_valid()->set_valid("0");
    // 2. 序列化
    std::string data = mp->valid().SerializeAsString();
    // 3. 拿到偏移量和检查长度是否相等
    size_t offset = mp->offset();
    if (mp->len() != data.size())
    {
        default_info("删除持久化数据失败,因为修改后的有效载荷跟文件当中的数据长度不同,len:%d ,mp->len():%d",data.size(),mp->len());
        return false;
    }
    // 4. 写入数据文件
    if (!FileHelper::write(_datafile, data.c_str(), offset, mp->len()))
    {
        default_info("删除持久化数据失败,消息ID:%s , 文件名:%s",mp->valid().properities().msg_id().c_str(),_datafile.c_str());
        return false;
    }
    return true;
}

5.gc垃圾回收

std::list<MessagePtr> gc();

步骤:

  1. 读取数据文件,将有效消息写入临时文件【加载有效信息】
  2. 删除数据文件
  3. 重命名临时文件

1.加载有效信息

步骤:

  1. 拿到文件总大小,并用offset来依次读取,直到offset>=文件总大小
  2. 先读取8字节长度,拿到len(offset+=sizeof(size_t))
  3. 读取len个字节的数据(offset+=len)
  4. 定义MessagePtr mp,反序列化拿到有效载荷
  5. 看mp是否有效,若有效,则进入下一步,否则continue,继续下一轮循环
  6. 复用_insert,将mp写入临时文件
  7. 将mp添加到需要返回的list当中
1.循环框架的构建
size_t offset = 0, sz = FileHelper::size(_datafile);
while (offset < sz)
{
    // 先读取长度
    offset += sizeof(size_t);
    // 再读取len个字节的有效数据
    offset += len;
    // 看是否有效,若有效则写入文件
}
2.读取长度

第二个参数是char*

static bool read(const std::string &filename, char* return_str, size_t offset, size_t len);
// 先读取长度
size_t len = 0;
if (!FileHelper::read(_datafile, reinterpret_cast<char *>(&len), offset, sizeof(size_t)))
{
    default_info("垃圾回收失败,因为读取数据文件长度失败,文件名:%s",_datafile.c_str());
    return msg_list;
}
offset += sizeof(size_t);
3.读取有效载荷

这里用string来读取
有两种方式读取:

对于string来说,有两种方法可以拿到内部char*类型的数组地址

1. &body[0](body[0]就是数组首元素【char类型】),&一下就变成数据首元素的地址【char*】类型(是安全的)

2. const_cast<char*>(body.c_str()):
body.c_str()const char*类型,然后用const_cast进行强转,去除const修饰【不推荐,强转是下下策,能不用就别用】
// 再读取len个字节的有效数据
std::string body(len, '\0');
if (!FileHelper::read(_datafile, &body[0], offset, len))
{
default_info("垃圾回收失败,因为读取数据文件有效载荷失败,文件名:%s",_datafile.c_str());
    return msg_list;
}
offset += len;
4.反序列化拿到有效载荷
MessagePtr mp = std::make_shared<Message>();
mp->mutable_valid()->ParseFromString(body);
5.看是否有效,进行写入
// 若为有效,则写入临时文件,并插入到list当中
if (mp->valid().valid() == "1")
{
    // 写入临时文件,_insert函数内部会填充该智能指针的offset和len字段
    if (!_insert(_tmpfile, mp))
    {
    default_info("垃圾回收失败,因为写入临时文件失败,临时文件名:%s",_tmpfile.c_str());
        return msg_list;
    }
    // 放到链表当中
    msg_list.push_back(mp);
}
读取数据文件完整代码
std::list<MessagePtr> msg_list;
// 依次读取_datafile,将有效载荷进行序列化
size_t offset = 0, sz = FileHelper::size(_datafile);
while (offset < sz)
{
    // 先读取长度
    size_t len = 0;
    if (!FileHelper::read(_datafile, reinterpret_cast<char *>(&len), offset, sizeof(size_t)))
    {
        default_info("垃圾回收失败,因为读取数据文件长度失败,文件名:%s",_datafile.c_str());
        return msg_list;
    }
    offset += sizeof(size_t);
    // 再读取len个字节的有效数据
    std::string body(len, '\0');
    if (!FileHelper::read(_datafile, &body[0], offset, len)) // 推荐,类型安全
    // if (!FileHelper::read(_datafile, const_cast<char*>(body.c_str()), offset, len))// 不推荐,强转是下下策,能不用就别用
    {
        default_info("垃圾回收失败,因为读取数据文件有效载荷失败,文件名:%s",_datafile.c_str());
        return msg_list;
    }
    offset += len;
    // 反序列化拿到有效载荷
    MessagePtr mp = std::make_shared<Message>();
    mp->mutable_valid()->ParseFromString(body);
    // 若为有效,则写入临时文件,并插入到list当中
    if (mp->valid().valid() == "1")
    {
        // 写入临时文件,_insert函数内部会填充该智能指针的offset和len字段
        if (!_insert(_tmpfile, mp))
        {
            default_info("垃圾回收失败,因为写入临时文件失败,临时文件名:%s",_tmpfile.c_str());
            return msg_list;
        }
        // 放到链表当中
        msg_list.push_back(mp);
    }
}

2.删除数据文件,重命名临时文件

// 删除数据文件,重命名临时文件
if (!FileHelper::removeFile(_datafile))
{
    default_info("垃圾回收失败,因为删除数据文件失败 %s",_datafile.c_str());
    return msg_list;
}
if (!FileHelper::rename(_tmpfile, _datafile))
{
    default_info("垃圾回收失败,因为重命名临时文件失败 %s",_tmpfile.c_str());
    return msg_list;
}

3.完整代码

std::list<MessagePtr> gc()
{
    std::list<MessagePtr> msg_list;
    // 依次读取_datafile,将有效载荷进行序列化
    size_t offset = 0, sz = FileHelper::size(_datafile);
    while (offset < sz)
    {
        // 先读取长度
        size_t len = 0;
        if (!FileHelper::read(_datafile, reinterpret_cast<char *>(&len), offset, sizeof(size_t)))
        {
            default_info("垃圾回收失败,因为读取数据文件长度失败,文件名:%s",_datafile.c_str());
            return msg_list;
        }
        offset += sizeof(size_t);
        // 再读取len个字节的有效数据
        std::string body(len, '\0');
        if (!FileHelper::read(_datafile, &body[0], offset, len)) // 推荐,类型安全
        // if (!FileHelper::read(_datafile, const_cast<char*>(body.c_str()), offset, len))// 不推荐,强转是下下策,能不用就别用
        {
            default_info("垃圾回收失败,因为读取数据文件有效载荷失败,文件名:%s",_datafile.c_str());
            return msg_list;
        }
        offset += len;
        // 反序列化拿到有效载荷
        MessagePtr mp = std::make_shared<Message>();
        mp->mutable_valid()->ParseFromString(body);
        // 若为有效,则写入临时文件,并插入到list当中
        if (mp->valid().valid() == "1")
        {
            // 写入临时文件,_insert函数内部会填充该智能指针的offset和len字段
            if (!_insert(_tmpfile, mp))
            {
                default_info("垃圾回收失败,因为写入临时文件失败,临时文件名:%s",_tmpfile.c_str());
                return msg_list;
            }
            // 放到链表当中
            msg_list.push_back(mp);
        }
    }
    // 删除数据文件,重命名临时文件
    if (!FileHelper::removeFile(_datafile))
    {
        default_info("垃圾回收失败,因为删除数据文件失败 %s",_datafile.c_str());
        return msg_list;
    }
    if (!FileHelper::rename(_tmpfile, _datafile))
    {
        default_info("垃圾回收失败,因为重命名临时文件失败 %s",_tmpfile.c_str());
        return msg_list;
    }
    // 最后返回list即可
    return msg_list;
}

6.完整代码

using MessagePtr = std::shared_ptr<Message>;

const std::string data_suffix = ".data";
const std::string tmp_suffix = ".tmp";

class MessageMapper
{
public:
    MessageMapper(const std::string &basedir, const std::string &qname)
    {
        if (!FileHelper::createDir(basedir))
        {
            default_info("消息持久化管理模块句柄初始化失败,因为创建消息文件目录失败, 目录名: %s",basedir.c_str());
            abort();
        }
        std::string dir = basedir;
        if (dir.back() != '/')
            dir.push_back('/');
        _datafile = dir + qname + data_suffix;
        _tmpfile = dir + qname + tmp_suffix;
        if (!createFile())
        {
            default_info("消息持久化管理模块句柄初始化失败,因为创建数据和临时文件失败, 数据文件名: %s , 临时文件名: %s",_datafile.c_str(),_tmpfile.c_str());
            abort();
        }
    }

    // 注意:消息持久化管理类不能在析构的时候删除文件(这就是画蛇添足,好不容易持久化好了,析构又把它删了,那不白持久化了)
    bool createFile()
    {
        if (!FileHelper::createFile(_datafile))
        {
            default_info("创建数据文件失败, 数据文件名: %s",_datafile.c_str());
            return false;
        }
        if (!FileHelper::createFile(_tmpfile))
        {
            default_info("创建临时文件失败, 临时文件名: %s",_tmpfile.c_str());
            return false;
        }
        return true;
    }

    bool removeFile()
    {
        if (!FileHelper::removeFile(_datafile))
        {
            default_info("删除数据文件失败, 数据文件名: %s",_datafile.c_str());
            return false;
        }
        if (!FileHelper::removeFile(_tmpfile))
        {
            default_info("删除临时文件失败, 临时文件名: %s",_tmpfile.c_str());
            return false;
        }
        return true;
    }

    bool insert(MessagePtr &mp)
    {
        // 因为gc的时候是要向临时文件写入数据的,因此把insert单独提出来
        return _insert(_datafile, mp);
    }

    bool erase(MessagePtr &mp)
    {
        // 1. 将有效标志置为无效
        mp->mutable_valid()->set_valid("0");
        // 2. 序列化
        std::string data = mp->valid().SerializeAsString();
        // 3. 拿到偏移量和检查长度是否相等
        size_t offset = mp->offset();
        if (mp->len() != data.size())
        {
            default_info("删除持久化数据失败,因为修改后的有效载荷跟文件当中的数据长度不同,len:%d ,mp->len():%d",data.size(),mp->len());
            return false;
        }
        // 4. 写入数据文件
        if (!FileHelper::write(_datafile, data.c_str(), offset, mp->len()))
        {
            default_info("删除持久化数据失败,消息ID:%s , 文件名:%s",mp->valid().properities().msg_id().c_str(),_datafile.c_str());
            return false;
        }
        return true;
    }

    std::list<MessagePtr> gc()
    {
        std::list<MessagePtr> msg_list;
        // 依次读取_datafile,将有效载荷进行序列化
        size_t offset = 0, sz = FileHelper::size(_datafile);
        while (offset < sz)
        {
            // 先读取长度
            size_t len = 0;
            if (!FileHelper::read(_datafile, reinterpret_cast<char *>(&len), offset, sizeof(size_t)))
            {
                default_info("垃圾回收失败,因为读取数据文件长度失败,文件名:%s",_datafile.c_str());
                return msg_list;
            }
            offset += sizeof(size_t);
            // 再读取len个字节的有效数据
            std::string body(len, '\0');
            if (!FileHelper::read(_datafile, &body[0], offset, len)) // 推荐,类型安全
            // if (!FileHelper::read(_datafile, const_cast<char*>(body.c_str()), offset, len))// 不推荐,强转是下下策,能不用就别用
            {
                default_info("垃圾回收失败,因为读取数据文件有效载荷失败,文件名:%s",_datafile.c_str());
                return msg_list;
            }
            offset += len;
            // 反序列化拿到有效载荷
            MessagePtr mp = std::make_shared<Message>();
            mp->mutable_valid()->ParseFromString(body);
            // 若为有效,则写入临时文件,并插入到list当中
            if (mp->valid().valid() == "1")
            {
                // 写入临时文件,_insert函数内部会填充该智能指针的offset和len字段
                if (!_insert(_tmpfile, mp))
                {
                    default_info("垃圾回收失败,因为写入临时文件失败,临时文件名:%s",_tmpfile.c_str());
                    return msg_list;
                }
                // 放到链表当中
                msg_list.push_back(mp);
            }
        }
        // 删除数据文件,重命名临时文件
        if (!FileHelper::removeFile(_datafile))
        {
            default_info("垃圾回收失败,因为删除数据文件失败 %s",_datafile.c_str());
            return msg_list;
        }
        if (!FileHelper::rename(_tmpfile, _datafile))
        {
            default_info("垃圾回收失败,因为重命名临时文件失败 %s",_tmpfile.c_str());
            return msg_list;
        }
        // 最后返回list即可
        return msg_list;
    }

private:
    bool _insert(const std::string &filename, MessagePtr &mp)
    {
        // 1.序列化有效载荷,拿到消息长度
        std::string load = mp->valid().SerializeAsString();
        size_t len = load.size();

        // 2.拿到文件大小(偏移量)
        size_t offset = FileHelper::size(filename);

        // 3.把len写入文件,写sizeof(size_t)个字节
        if (!FileHelper::write(filename, reinterpret_cast<const char *>(&len), offset, sizeof(size_t)))
        {
            default_info("消息的长度写入文件失败, 消息ID: %s , 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());
            return false;
        }

        // 4.把有效载荷写入文件
        if (!FileHelper::write(filename, load))
        {
            default_info("消息的有效载荷写入文件失败, 消息ID: %s , 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());
            return false;
        }
        // 5.设置mp的offset和len这两个字段
        mp->set_len(len);
        mp->set_offset(offset + sizeof(size_t)); // 注意:偏移量是有效载荷的起始位置,而不是长度的起始位置!!
        return true;
    }

    std::string _datafile;
    std::string _tmpfile;
};

7.是否需要加锁???

在这里插入图片描述
因此消息持久化管理模块无需加锁

六、队列消息管理模块实现

1.成员与构造函数

public:

QueueMessageManager(const std::string &basedir, const std::string &qname)
: _qname(qname), _mapper(basedir, qname), _total_count(0), _valid_count(0) {}

private:

std::string _qname;
std::mutex _mutex;
MessageMapper _mapper;
std::list<MessagePtr> _waitpush_list;
std::unordered_map<std::string, MessagePtr> _waitack_map;
std::unordered_map<std::string, MessagePtr> _durable_map;
size_t _total_count;
size_t _valid_count;

这里没有在构造的时候就恢复历史消息,是为了加快对象的构造过程
以便减少总体消息管理模块的锁竞争

这句话是怎么个意思呢?
写个伪代码,大家就一清二楚了

总体消息管理模块当中的初始化队列消息管理模块的函数
void initQueueMessageManager(const std::string &qname)
{
    QueueMessageManager::ptr qmmp;
    {
        // 0. 加锁
        std::unique_lock<std::mutex> ulock(_mutex);
        // 1. 查找是否存在
        if (_qmsg_map.count(qname))
            return;
        // 2. 直接插入
        qmmp = std::make_shared<QueueMessageManager>(_basedir, qname);
        _qmsg_map.insert(std::make_pair(qname, qmmp));
    }
    // 为了降低锁冲突,因此把recovery单提出来搞
    qmmp->recovery();
}
private:
std::mutex _mutex;
std::unordered_map<std::string, QueueMessageManager::ptr> _qmsg_map;
std::string _basedir;

也就是说,因为 总体消息管理模块中的哈希表需要保证线程安全,
因此对象构造依旧属于临界区代码
所以为了让临界区代码执行的快一些(降低锁冲突),又因为消息持久化管理模块本来就加锁了,所以我们可以放心大胆的把recovery单拎出去

2.恢复历史消息

  1. 加锁
  2. 用gc返回的值初始化待推送消息链表
  3. 将gc返回的值放到持久化哈希表中
  4. 更新持久化消息总数和有效消息总数
void recovery()
{
    std::unique_lock<std::mutex> ulock(_mutex);
    // 1. 恢复历史消息
    _waitpush_list = _mapper.gc();
    // 2. 将gc后的消息放到持久化哈希表中
    for (auto &mp : _waitpush_list)
    {
        _durable_map[mp->valid().properities().msg_id()] = mp;
    }
    // 3. 更新持久化消息总数和有效消息总数
    _total_count = _valid_count = _durable_map.size();
}

3.发布消息

1.函数签名

因为消息是依附于队列而存在的,因此如果该队列是非持久化队列,那么该消息即使持久化了,也没有价值,因为无法被消费

因此发布消息时我们需要知道对应队列是持久化的
现在有两种选择:

  1. 包含队列文件,去查对应队列是否持久化了【不好,因为会增大耦合度】
  2. 加一个参数,表示对应的队列是否持久化了
// 3. 消息的基本属性
message BasicProperities
{
    string msg_id = 1;
    DeliveryMode mode = 2;
    string routing_key = 3;
}
message Message 
{
    message ValidLoad
    {
        string body = 1;
        BasicProperities properities = 2;
        string valid = 3;
    }
    ValidLoad valid = 1;
    uint64 offset = 2;
    uint64 len = 3;  
}

下面我们来看,消息结构体当中,那些是需要用户传入的?
BasicProperities properities和string body是需要用户传入的
而其余的valid,offset和len都无需用户传入

因此函数签名是:

bool publishMessage(const BasicProperities *bp, const std::string &body, DeliveryMode mode);

2.步骤

  1. 构建消息智能指针
  2. 看是否需要持久化
  3. 如果需要,则进行第四步,否则跳到第五步
  4. 在文件中插入该消息并且放到持久化消息哈希表中,更新total_count和valid_count
  5. 放到待推送消息链表当中
0.要不要遍历消息链表

我们常规来说,一想到的肯定是遍历啊。
可是有问题:

首先,我们要想:为何我们要遍历?

是为了防止推送重复消息,可是我们msg_id是UUID啊,出现重复的率几乎为0,因此如果真的出现了重复
那么我们有理由认为,这是调用者要求我们重新推送,所以我们不做检查,也就是不遍历消息链表

1.构建消息智能指针
// 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode
// 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化

// 1. 构建消息智能指针
MessagePtr mp = std::make_shared<Message>();
mp->mutable_valid()->set_body(body);
mp->mutable_valid()->set_valid("1");

mp->mutable_valid()->mutable_properities()->set_msg_id(bp->msg_id());
mp->mutable_valid()->mutable_properities()->set_routing_key(bp->routing_key());
DeliveryMode final_mode = (mode == DURABLE && bp->mode() == DURABLE) ? DURABLE : UNDURABLE;
mp->mutable_valid()->mutable_properities()->set_mode(final_mode);
2.持久化

在持久化这里,_mapper的操作和那三个数据结构,还有两个整形变量
,他们都需要受到加锁保护,因此下面就需要加锁了

而上面构造mp,因为线程独享栈空间,局部变量是线程安全的,所以上面无需加锁(降低锁冲突)

// 加锁
std::unique_lock<std::mutex> ulock(_mutex);

// 2. 看是否需要持久化
if (final_mode == DURABLE)
{
    if (!_mapper.insert(mp))
    {
        default_info("发布消息失败, 因为消息持久化失败, 消息ID: %s",bp->msg_id().c_str());
        return false;
    }
    // 放到持久化哈希表中
    _durable_map[bp->msg_id()] = mp;
    _total_count++;
    _valid_count++;
}
3.放到待推送链表当中
// 3. 放到待推送链表当中
_waitpush_list.push_back(mp);

3.完整代码

bool publishMessage(const BasicProperities *bp, const std::string &body, DeliveryMode mode)
{
    // 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode
    // 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化

    // 1. 构建消息智能指针
    MessagePtr mp = std::make_shared<Message>();
    mp->mutable_valid()->set_body(body);
    mp->mutable_valid()->set_valid("1");

    mp->mutable_valid()->mutable_properities()->set_msg_id(bp->msg_id());
    mp->mutable_valid()->mutable_properities()->set_routing_key(bp->routing_key());
    DeliveryMode final_mode = (mode == DURABLE && bp->mode() == DURABLE) ? DURABLE : UNDURABLE;
    mp->mutable_valid()->mutable_properities()->set_mode(final_mode);

    // 加锁
    std::unique_lock<std::mutex> ulock(_mutex);

    // 2. 看是否需要持久化
    if (final_mode == DURABLE)
    {
        if (!_mapper.insert(mp))
        {
        default_info("发布消息失败, 因为消息持久化失败, 消息ID: %s",bp->msg_id().c_str());
            return false;
        }
        // 放到持久化哈希表中
        _durable_map[bp->msg_id()] = mp;
        _total_count++;
        _valid_count++;
    }

    // 3. 放到待推送链表当中
    _waitpush_list.push_back(mp);
    return true;
}

4.确认消息

1.函数签名

删除消息只需要用消息ID在待确认哈希表当中就能查到对应的MessagePtr,就能够拿到该消息的所有信息
因此,函数签名是:

bool ackMessage(const std::string &msg_id);

2.查找是否存在

跟发布消息不同,确认消息这里需要查找是否存在。
因为不存在就代表对应消息早已被确认过了,这样反而能提高效率
毕竟,本来就要根据msg_id才能拿到MessagePtr

步骤:

  1. 在待确认哈希表当中查找
  2. 看是否持久化
  3. 若持久化,则进行第四步,否则跳到第五步
  4. 在文件中删除该消息,从持久化哈希表当中删除,更新valid_count【删除后进行gc检测并执行】
  5. 在待确认哈希表当中删除
1.在待确认哈希表当中查找
// 1. 在待确认哈希表当中查找
std::unique_lock<std::mutex> ulock(_mutex);
auto iter = _waitack_map.find(msg_id);
if (iter == _waitack_map.end())
    return true;
2.持久化
// 2. 看是否持久化
MessagePtr mp = iter->second;
if (mp->valid().properities().mode() == DURABLE)
{
    if (!_mapper.erase(mp))
    {
    default_info("消息确认失败,因为消息持久化删除失败,消息ID:%s",msg_id.c_str());
        return false;
    }
    // 从持久化哈希表当中删除
    _durable_map.erase(msg_id);
    _valid_count--;
    // 每次删除时看看是否需要gc
    check_and_gc();
}
3.从待确认消息中哈希表当中删除
// 3. 从待确认哈希表当中删除
_waitack_map.erase(msg_id);
3.完整代码
bool ackMessage(const std::string &msg_id)
{
    // 1. 在待确认哈希表当中查找
    std::unique_lock<std::mutex> ulock(_mutex);
    auto iter = _waitack_map.find(msg_id);
    if (iter == _waitack_map.end())
        return true;
    // 2. 看是否持久化
    MessagePtr mp = iter->second;
    if (mp->valid().properities().mode() == DURABLE)
    {
        if (!_mapper.erase(mp))
        {
        default_info("消息确认失败,因为消息持久化删除失败,消息ID:%s",msg_id.c_str());
            return false;
        }
        // 从持久化哈希表当中删除
        _durable_map.erase(msg_id);
        _valid_count--;
        // 每次删除时看看是否需要gc
        check_and_gc();
    }
    // 3. 从待确认哈希表当中删除
    _waitack_map.erase(msg_id);
    return true;
}

5.推送消息

其实就是取出待推送链表的队头元素,放到待确认哈希表当中
并返回该MessagePtr

MessagePtr front()
{
    std::unique_lock<std::mutex> ulock(_mutex);
    if (_waitpush_list.empty())
    {
        return MessagePtr();
    }
    MessagePtr mp = _waitpush_list.front();
    _waitpush_list.pop_front();
    _waitack_map[mp->valid().properities().msg_id()] = mp;
    return mp;
}

6.销毁队列相关消息

既包括持久化消息文件,也包括内存级消息数据结构

// 需要提供销毁该队列所有信息的方法(删除队列时要用)
void clear()
{
    std::unique_lock<std::mutex> ulock(_mutex);
    _mapper.removeFile();
    _waitpush_list.clear();
    _waitack_map.clear();
    _durable_map.clear();
    _valid_count = _total_count = 0;
}

7.用于测试的一堆get接口

// 不能修饰为const成员函数,因为需要申请释放锁
size_t waitpush_count()
{
    std::unique_lock<std::mutex> ulock(_mutex);
    return _waitpush_list.size();
}

size_t waitack_count()
{
    std::unique_lock<std::mutex> ulock(_mutex);
    return _waitack_map.size();
}

size_t total_count()
{
    std::unique_lock<std::mutex> ulock(_mutex);
    return _total_count;
}

size_t valid_count()
{
    std::unique_lock<std::mutex> ulock(_mutex);
    return _valid_count;
}

8.check_and_gc

1.要不要加锁

这里不能加锁,因为:

  1. 我们这个check_and_gc函数不允许被外界主动调用,是一个private成员函数
  2. 只有ackMessage函数才会调用我们这个check_and_gc函数,而ackMessage当中已经加了锁,因此如果check_and_gc也加锁的话,那么就死锁了
  3. 其实大家可以把check_and_gc当作就是ackMessage当中的代码,只不过因为功能相对独立,因此提取出来封装为单独一个函数,实现解耦

那我可以加两个锁吗?
这里也加一个锁?
这就是典型的墨守成规,访问同一个临界资源,用两个锁来保护
这是互斥锁的非常错误的使用方法

check_and_gc自己搞一个锁,没有任何意义

2.框架

bool check()
{
    return _total_count > 2000 && _valid_count * 2 < _total_count;
}

// 为了提高代码的健壮性和可读性,并提高容错机制,所以虽然由于shared_ptr的共享特性,我们无需更新(早已隐式更新)
// 但是我们依然选择显示更新
void check_and_gc()
{
    if (!check())
        return;
    //开始gc
    // ...
}

3.步骤

  1. 调用_mapper的gc,拿到包含有效消息的MessagePtr的list
  2. 用这个list来更新持久化哈希表
  3. 更新持久化消息总数和持久化有效消息总数

4.一个问题

细心的小伙伴可能已经发现了一个问题:
我们为每个消息所对应的Message对象从头到尾只创建了一个实例
利用智能指针实现同一个实例对象的共享

而消息的offset和len字段早已在消息持久化管理模块的_insert函数当中更新了

因此我们的持久化哈希表当中的MessagePtr看似没变,实际上已经改变了,但是这个比较细节,算是一个"隐藏"特性
【shared_ptr共享所管理的资源,均保持可见性】

但是如果不更新的话,代码的可读性和健壮性以及容错机制都不太好,所以我们在这里采取显式更新

5.调用_mapper的gc,拿到包含有效消息的MessagePtr的list

std::list<MessagePtr> valid_list = _mapper.gc();

6.更新持久化消息总数和持久化有效消息总数

_valid_count = _total_count = valid_list.size();

7.更新持久化哈希表

就是遍历有效消息链表,然后拿到msg_id去哈希表当中查找并更新
在这里还可以对那些持久化的有效消息,但是没有在持久化哈希表当中的数据进行处理
处理方式就是将其重新放到待推送链表当中进行推送

for (auto &mp : valid_list)
{
    std::string msg_id = mp->valid().properities().msg_id();
    // 1.在持久化哈希表中查找该消息
    auto iter = _durable_map.find(msg_id);
    if (iter == _durable_map.end())
    {
        std::ostringstream oss;
        oss << "有个持久化的消息没有在_durable_map当中存储... 即:代码有问题,消息ID" << msg_id << "\n";
        // 将其重新放到待推送链表当中,进行推送
        _waitpush_list.push_back(mp);
        // 插入到持久化哈希表中
        _durable_map.insert(std::make_pair(msg_id, mp));
        continue;
    }
    // 这里是更新偏移量和长度,[其实这里也不需要更新,因为智能指针共享所管理的资源,均保持可见性]
    // 对应的offset和len早已在QueueMessageManager的gc时调用的_insert当中更新了
    // 但是为了代码的健壮性和可读性(我们不能要求每一个看我们这个代码的人都去想智能指针管理资源的共享性和可见性,因此这里额外更新一下
    // 也是顺手的事, 也是为了找是否有哪些消息不在持久化哈希表中,提高代码容错性)
    iter->second->set_offset(mp->offset());
    iter->second->set_len(mp->len());
}
// 为了提高代码的健壮性和可读性,并提高容错机制,所以虽然由于shared_ptr的共享特性,我们无需更新(早已隐式更新)
// 但是我们依然选择显式更新
void check_and_gc()
{
    if (!check())
        return;
    std::list<MessagePtr> valid_list = _mapper.gc();
    // 按照valid_list当中的MessagePtr来更新持久化哈希表
    for (auto &mp : valid_list)
    {
        std::string msg_id = mp->valid().properities().msg_id();
        // 1.在持久化哈希表中查找该消息
        auto iter = _durable_map.find(msg_id);
        if (iter == _durable_map.end())
        {
            std::ostringstream oss;
            oss << "有个持久化的消息没有在_durable_map当中存储... 即:代码有问题,消息ID" << msg_id << "\n";
            // 将其重新放到待推送链表当中,进行推送
            _waitpush_list.push_back(mp);
            // 插入到持久化哈希表中
            _durable_map.insert(std::make_pair(msg_id, mp));
            continue;
        }
        // 这里是更新偏移量和长度,[其实这里也不需要更新,因为智能指针共享所管理的资源,均保持可见性]
        // 对应的offset和len早已在QueueMessageManager的gc时调用的_insert当中更新了
        // 但是为了代码的健壮性和可读性(我们不能要求每一个看我们这个代码的人都去想智能指针管理资源的共享性和可见性,因此这里额外更新一下
        // 也是顺手的事, 也是为了找是否有哪些消息不在持久化哈希表中,提高代码容错性)
        iter->second->set_offset(mp->offset());
        iter->second->set_len(mp->len());
    }
    _valid_count = _total_count = valid_list.size();
}

9.完整代码

class QueueMessageManager
{
public:
    using ptr = std::shared_ptr<QueueMessageManager>;
    QueueMessageManager(const std::string &basedir, const std::string &qname)
        : _qname(qname), _mapper(basedir, qname), _total_count(0), _valid_count(0) {}

    void recovery()
    {
        std::unique_lock<std::mutex> ulock(_mutex);
        // 1. 恢复历史消息
        _waitpush_list = _mapper.gc();
        // 2. 将gc后的消息放到持久化哈希表中
        for (auto &mp : _waitpush_list)
        {
            _durable_map[mp->valid().properities().msg_id()] = mp;
        }
        // 3. 更新持久化消息总数和有效消息总数
        _total_count = _valid_count = _durable_map.size();
    }

    bool publishMessage(const BasicProperities *bp, const std::string &body, DeliveryMode mode)
    {
        // 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode
        // 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化

        // 1. 构建消息智能指针
        MessagePtr mp = std::make_shared<Message>();
        mp->mutable_valid()->set_body(body);
        mp->mutable_valid()->set_valid("1");

        mp->mutable_valid()->mutable_properities()->set_msg_id(bp->msg_id());
        mp->mutable_valid()->mutable_properities()->set_routing_key(bp->routing_key());
        DeliveryMode final_mode = (mode == DURABLE && bp->mode() == DURABLE) ? DURABLE : UNDURABLE;
        mp->mutable_valid()->mutable_properities()->set_mode(final_mode);

        // 加锁
        std::unique_lock<std::mutex> ulock(_mutex);

        // 2. 看是否需要持久化
        if (final_mode == DURABLE)
        {
            if (!_mapper.insert(mp))
            {
            default_info("发布消息失败, 因为消息持久化失败, 消息ID: %s",bp->msg_id().c_str());
                return false;
            }
            // 放到持久化哈希表中
            _durable_map[bp->msg_id()] = mp;
            _total_count++;
            _valid_count++;
        }

        // 3. 放到待推送链表当中
        _waitpush_list.push_back(mp);
        return true;
    }

    bool ackMessage(const std::string &msg_id)
    {
        // 1. 在待确认哈希表当中查找
        std::unique_lock<std::mutex> ulock(_mutex);
        auto iter = _waitack_map.find(msg_id);
        if (iter == _waitack_map.end())
            return true;
        // 2. 看是否持久化
        MessagePtr mp = iter->second;
        if (mp->valid().properities().mode() == DURABLE)
        {
            if (!_mapper.erase(mp))
            {
            default_info("消息确认失败,因为消息持久化删除失败,消息ID:%s",msg_id.c_str());
                return false;
            }
            // 从持久化哈希表当中删除
            _durable_map.erase(msg_id);
            _valid_count--;
            // 每次删除时看看是否需要gc
            check_and_gc();
        }
        // 3. 从待确认哈希表当中删除
        _waitack_map.erase(msg_id);
        return true;
    }

    MessagePtr front()
    {
        std::unique_lock<std::mutex> ulock(_mutex);
        if (_waitpush_list.empty())
        {
            return MessagePtr();
        }
        MessagePtr mp = _waitpush_list.front();
        _waitpush_list.pop_front();
        _waitack_map[mp->valid().properities().msg_id()] = mp;
        return mp;
    }

    // 需要提供销毁该队列所有信息的方法(删除队列时要用)
    void clear()
    {
        std::unique_lock<std::mutex> ulock(_mutex);
        _mapper.removeFile();
        _waitpush_list.clear();
        _waitack_map.clear();
        _durable_map.clear();
        _valid_count = _total_count = 0;
    }

    // 不能修饰为const成员函数,因为需要申请释放锁
    size_t waitpush_count()
    {
        std::unique_lock<std::mutex> ulock(_mutex);
        return _waitpush_list.size();
    }

    size_t waitack_count()
    {
        std::unique_lock<std::mutex> ulock(_mutex);
        return _waitack_map.size();
    }

    size_t total_count()
    {
        std::unique_lock<std::mutex> ulock(_mutex);
        return _total_count;
    }

    size_t valid_count()
    {
        std::unique_lock<std::mutex> ulock(_mutex);
        return _valid_count;
    }

private:
    bool check()
    {
        return _total_count > 2000 && _valid_count * 2 < _total_count;
    }

    // 为了提高代码的健壮性和可读性,并提高容错机制,所以虽然由于shared_ptr的共享特性,我们无需更新(早已隐式更新)
    // 但是我们依然选择显示更新
    void check_and_gc()
    {
        if (!check())
            return;
        std::list<MessagePtr> valid_list = _mapper.gc();
        // 按照valid_list当中的MessagePtr来更新持久化哈希表
        for (auto &mp : valid_list)
        {
            std::string msg_id = mp->valid().properities().msg_id();
            // 1.在持久化哈希表中查找该消息
            auto iter = _durable_map.find(msg_id);
            if (iter == _durable_map.end())
            {
                std::ostringstream oss;
                oss << "有个持久化的消息没有在_durable_map当中存储... 即:代码有问题,消息ID" << msg_id << "\n";
                // 将其重新放到待推送链表当中,进行推送
                _waitpush_list.push_back(mp);
                // 插入到持久化哈希表中
                _durable_map.insert(std::make_pair(msg_id, mp));
                continue;
            }
            // 这里是更新偏移量和长度,[其实这里也不需要更新,因为智能指针共享所管理的资源,均保持可见性]
            // 对应的offset和len早已在QueueMessageManager的gc时调用的_insert当中更新了
            // 但是为了代码的健壮性和可读性(我们不能要求每一个看我们这个代码的人都去想智能指针管理资源的共享性和可见性,因此这里额外更新一下
            // 也是顺手的事, 也是为了找是否有哪些消息不在持久化哈希表中,提高代码容错性)
            iter->second->set_offset(mp->offset());
            iter->second->set_len(mp->len());
        }
        _valid_count = _total_count = valid_list.size();
    }

    std::string _qname;
    std::mutex _mutex;
    MessageMapper _mapper;
    std::list<MessagePtr> _waitpush_list;
    std::unordered_map<std::string, MessagePtr> _waitack_map;
    std::unordered_map<std::string, MessagePtr> _durable_map;
    size_t _total_count;
    size_t _valid_count;
};

七、总体消息管理模块实现

1.介绍

总体消息管理模块负责管理所有队列的消息
也就是管理所有的队列消息管理模块句柄

因此,它的成员:互斥锁,哈希表,basedir

private:
std::mutex _mutex;
std::unordered_map<std::string, QueueMessageManager::ptr> _qmsg_map;
std::string _basedir;

接口:

  1. 初始化和销毁队列消息管理模块(增,删)
  2. 队列消息管理模块所提供的所有接口(全都额外加一个queue_name来指定对那个队列进行什么操作 – 类似于查+操作)
  3. 唯一需要注意的是:只有查操作才需要互斥锁,是为了保证哈希表的线程安全。
    而操作则无需加锁,因为所有的队列消息管理模块本来就都需要加锁

2.加锁问题

那能不能我总体消息管理模块这里对操作部分也加锁,然后队列消息模块当中不加锁呢?

画一下图:
在这里插入图片描述
综上:

队列消息管理模块必须加锁,因为它必存在被多线程同时访问的可能

了解了上面的设计之后,代码很easy了,直接给出了

3.完整代码

class MessageManager
{
public:
    using ptr = std::shared_ptr<MessageManager>;

    MessageManager(const std::string &basedir)
        : _basedir(basedir) {}

    void initQueueMessageManager(const std::string &qname)
    {
        QueueMessageManager::ptr qmmp;
        {
            // 0. 加锁
            std::unique_lock<std::mutex> ulock(_mutex);
            // 1. 查找是否存在
            if (_qmsg_map.count(qname))
                return;
            // 2. 直接插入
            qmmp = std::make_shared<QueueMessageManager>(_basedir, qname);
            _qmsg_map.insert(std::make_pair(qname, qmmp));
        }
        // 为了降低锁冲突,因此把recovery单提出来搞
        qmmp->recovery();
    }

    void destroyQueueMessageManager(const std::string &qname)
    {
        // 这里必须是对象,如果是指针或者引用就野了
        QueueMessageManager::ptr qmmp;
        {
            std::unique_lock<std::mutex> ulock(_mutex);
            auto iter = _qmsg_map.find(qname);
            if (iter == _qmsg_map.end())
            {
                return;
            }
            qmmp = iter->second;
            _qmsg_map.erase(iter);
        }
        // 为了降低锁冲突,因此把clear单提出来搞
        qmmp->clear();
    }

    bool publishMessage(const std::string &qname, const BasicProperities *bp, const std::string &body, DeliveryMode mode)
    {
        QueueMessageManager::ptr qmmp;
        {
            std::unique_lock<std::mutex> ulock(_mutex);
            auto iter = _qmsg_map.find(qname);
            if (iter == _qmsg_map.end())
            {
                default_info("发布消息失败,因为该队列的消息管理模块句柄尚未初始化");
                return false;
            }
            qmmp = iter->second;
        }
        return qmmp->publishMessage(bp, body, mode);
    }

    bool ackMessage(const std::string &qname, const std::string &msg_id)
    {
        QueueMessageManager::ptr qmmp;
        {
            std::unique_lock<std::mutex> ulock(_mutex);
            auto iter = _qmsg_map.find(qname);
            if (iter == _qmsg_map.end())
            {
            default_info("确认消息失败,因为该队列的消息管理模块句柄尚未初始化");
                return false;
            }
            qmmp = iter->second;
        }
        return qmmp->ackMessage(msg_id);
    }

    MessagePtr front(const std::string &qname)
    {
        QueueMessageManager::ptr qmmp;
        {
            std::unique_lock<std::mutex> ulock(_mutex);
            auto iter = _qmsg_map.find(qname);
            if (iter == _qmsg_map.end())
            {
            default_error("获取消息失败,因为该队列的消息管理模块句柄尚未初始化");
                return MessagePtr();
            }
            qmmp = iter->second;
        }
        return qmmp->front();
    }

    size_t waitpush_count(const std::string &qname)
    {
        QueueMessageManager::ptr qmmp;
        {
            std::unique_lock<std::mutex> ulock(_mutex);
            auto iter = _qmsg_map.find(qname);
            if (iter == _qmsg_map.end())
            {
            default_error("获取待推送消息数量失败,因为该队列的消息管理模块句柄尚未初始化");
                return 0;
            }
            qmmp = iter->second;
        }
        return qmmp->waitpush_count();
    }

    size_t waitack_count(const std::string &qname)
    {
        QueueMessageManager::ptr qmmp;
        {
            std::unique_lock<std::mutex> ulock(_mutex);
            auto iter = _qmsg_map.find(qname);
            if (iter == _qmsg_map.end())
            {
            default_error("获取待确认消息数量失败,因为该队列的消息管理模块句柄尚未初始化");
                return 0;
            }
            qmmp = iter->second;
        }
        return qmmp->waitack_count();
    }

    size_t total_count(const std::string &qname)
    {
        QueueMessageManager::ptr qmmp;
        {
            std::unique_lock<std::mutex> ulock(_mutex);
            auto iter = _qmsg_map.find(qname);
            if (iter == _qmsg_map.end())
            {
            default_error("获取持久化消息总数失败,因为该队列的消息管理模块句柄尚未初始化");
                return 0;
            }
            qmmp = iter->second;
        }
        return qmmp->total_count();
    }

    size_t valid_count(const std::string &qname)
    {
        QueueMessageManager::ptr qmmp;
        {
            std::unique_lock<std::mutex> ulock(_mutex);
            auto iter = _qmsg_map.find(qname);
            if (iter == _qmsg_map.end())
            {
            default_error("获取持久化有效消息数量失败,因为该队列的消息管理模块句柄尚未初始化");
                return 0;
            }
            qmmp = iter->second;
        }
        return qmmp->valid_count();
    }

private:
    std::mutex _mutex;
    std::unordered_map<std::string, QueueMessageManager::ptr> _qmsg_map;
    std::string _basedir;
};

八、测试代码

#include <gtest/gtest.h>
#include "../mqserver/message.hpp"

using namespace ns_mq;

MessageManager::ptr mmp;

std::vector<std::string> uuid_vec = {
    UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid()};

class MessageTest : public testing::Environment
{
public:
    virtual void SetUp()
    {
        mmp = std::make_shared<MessageManager>("./queue_message");
        mmp->initQueueMessageManager("queue1");
    }

    virtual void TearDown()
    {
        //mmp->destroyQueueMessageManager("queue1");
    }
};

// TEST(message_test, recovery_test)
// {
//     ASSERT_EQ(mmp->waitpush_count("queue1"), 3);
//     ASSERT_EQ(mmp->waitack_count("queue1"), 0);
//     ASSERT_EQ(mmp->total_count("queue1"), 3);
//     ASSERT_EQ(mmp->valid_count("queue1"), 3);
// }

TEST(message_test, publish_test)
{
    BasicProperities bp;
    bp.set_msg_id(uuid_vec[0]);
    bp.set_mode(DURABLE);
    bp.set_routing_key("news.music.#");

    ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-0", DURABLE), true);
    bp.set_msg_id(uuid_vec[1]);
    ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-1", DURABLE), true);
    bp.set_msg_id(uuid_vec[2]);
    ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-2", DURABLE), true);
    bp.set_msg_id(uuid_vec[3]);
    ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-3", DURABLE), true);
    bp.set_msg_id(uuid_vec[4]);
    ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-4", DURABLE), true);

    ASSERT_EQ(mmp->waitpush_count("queue1"), 5);
    ASSERT_EQ(mmp->waitack_count("queue1"), 0);
    ASSERT_EQ(mmp->total_count("queue1"), 5);
    ASSERT_EQ(mmp->valid_count("queue1"), 5);
}

TEST(message_test, get_test)
{
    MessagePtr mp = mmp->front("queue1");
    ASSERT_NE(mp.get(), nullptr);
    ASSERT_EQ(mp->valid().body(), std::string("hello-0"));

    mp = mmp->front("queue1");
    ASSERT_NE(mp.get(), nullptr);
    ASSERT_EQ(mp->valid().body(), std::string("hello-1"));

    mp = mmp->front("queue1");
    ASSERT_NE(mp.get(), nullptr);
    ASSERT_EQ(mp->valid().body(), std::string("hello-2"));

    ASSERT_EQ(mmp->waitpush_count("queue1"), 2);
    ASSERT_EQ(mmp->waitack_count("queue1"), 3);
    ASSERT_EQ(mmp->total_count("queue1"), 5);
    ASSERT_EQ(mmp->valid_count("queue1"), 5);
}

TEST(message_test, ack_test)
{
    ASSERT_EQ(mmp->ackMessage("queue1", uuid_vec[0]), true);
    ASSERT_EQ(mmp->ackMessage("queue1", uuid_vec[1]), true);

    ASSERT_EQ(mmp->waitpush_count("queue1"), 2);
    ASSERT_EQ(mmp->waitack_count("queue1"), 1);
    ASSERT_EQ(mmp->total_count("queue1"), 5);
    ASSERT_EQ(mmp->valid_count("queue1"), 3);
}

int main(int argc, char *argv[])
{
    testing::AddGlobalTestEnvironment(new MessageTest);
    testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}

以上就是项目第五弹:队列消息管理模块的全部内容


原文地址:https://blog.csdn.net/Wzs040810/article/details/140357720

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!