自学内容网 自学内容网

【项目日记】仿mudou的高并发服务器 --- 实现缓冲区模块,通用类型Any模块,套接字模块

在这里插入图片描述

一个人知道自己为什么而活,
就可以忍受任何一种生活。
--- 尼采 ---

✨✨✨项目地址在这里 ✨✨✨

✨✨✨https://gitee.com/penggli_2_0/TcpServer✨✨✨


1 主从Reactor模型

这个项目的目标是实现一个可以高效处理请求的服务器,那么对于这样的一个服务器要如何实现呢?

这里采取主从Reactor模型的方案:

  1. 主Reactor模型负责对监听套接字进行管理,进行获取连接操作。
  2. 从属Reactor负责对连接的数据进行处理,并且为了保证线程安全,每一个从属Reactor都要绑定一个线程,这个连接的任务都在这个线程中完成。

简单来说就是这样的一个模型:
在这里插入图片描述
其中管理连接的对象是Connection对象,这个类是高并发服务器的核心部分,这里包含了对套接字的处理,事件等待等一系列操作!而Connection对象中对于这些操作的管理也要通过其他的对象来进行!

主要的工作就是搭建起主从Reactor模型,但是这个模型不是一下子就可以写出来的。为了实现主从Reactor模型,我们先需要实现一些基础类,对基础功能进封装,然后对这些功能进行整合,最终实现主从Reactor模型

2 基础功能封装

2.1 缓冲区 Buffer模块

Buffer模块是⼀个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能:

  1. 需要支持字符串读取/写入
  2. 需要支持char*缓冲区读取/写入
  3. 需要正常按行读取 — 方便解析http请求

Buffer模块成员变量很简单:

  • vector<char>容器_buffer: 对内存空间进行管理
  • uint64_t _reader_idx 读偏移:进行读取位置在_buffer容器中的偏移量,即读取的起始位置。
  • uint64_t _writer_idx 写偏移:下一次写入位置在_buffer容器中的偏移量,即写入的起始位置。

接下来实现一下基础功能:

  1. 构造函数:初始化读/写偏移为0 ,容器_buffer初始化一个大小 BUFFER_DEFAULT_SIZE。
  2. 获取当前写入起始地址:_buffer空间的起始地址加入写偏移量即写入起始地址。
  3. 获取当前读取起始地址: _buffer空间的起始地址加入读偏移量即读取起始地址。
  4. 获取缓冲区末尾空闲空间大小:写偏移之后的空闲空间 ,总体空间大小减去写偏移就是写偏移之后的空间大小。
  5. 获取缓冲区起始空闲空间大小:读偏移之前的空闲空间,其实就是读偏移的大小。
  6. 获取可读数据大小:写偏移减去读偏移就就之间可读空间的大小!
  7. 读/写偏移向后移动
    * 先根据len判断是否小于可读数据大小 len必须小于可读数据大小,然后移动读偏移。
    * 向后移动必须小于当前后边的空闲空间大小,写入数据必须小于缓冲区剩余空间大小,不足就进行扩容!
  8. 确保可写空间足够 :
    * 末尾空闲空间足够 直接返回。
    * 末尾空间不足,但算上起始位置的空闲空间大小足够 ,将数据移动到起始位置。
    * 如果总空间不足 ,进行扩容,扩容到足够空间即可
  9. 写入数据:首先保证有足够空间,然后将数据数据拷贝进去。可以继续设计出针对string的写入、针对Buffer的写入以及写入 + 压入数据
  10. 读取数据:首先读取大小len必须小于可读取数据大小,然后拷贝数据出来。同样可以设计出针对string的读取、针对Buffer的读取以及读取+弹出数据
  11. 清空缓冲区:将偏移量归零即可!
  12. 读取一行数据:先找到换行符,然后进行读取。
// 缓冲区Buffer类
class Buffer
{
private:
    std::vector<char> _buffer;
    uint64_t _reader_idx;
    uint64_t _writer_idx;

private:
    char *Begin() { return &*_buffer.begin(); }

public:
    Buffer() : _buffer(BUFFER_DEFAULT_SIZE), _reader_idx(0), _writer_idx(0) {}
    // 获取当前写入起始地址
    char *WritePos() { return Begin() + _writer_idx; }
    // 获取当前读取起始地址
    char *ReadPos() { return Begin() + _reader_idx; }
    // 获取读取位置之前的空闲空间
    uint64_t HeadIdleSize() { return _reader_idx; }
    // 获取写入位置之后的空闲空间
    uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }
    // 获取可读数据大小
    uint64_t ReadAbleSize() { return _writer_idx - _reader_idx; }
    // 读/写偏移移动
    void MoveReadOffset(uint64_t len)
    {
        if (len <= 0)
            return;
        assert(len <= ReadAbleSize());
        _reader_idx += len;
    }
    void MoveWriteOffset(uint64_t len)
    {
        if (len <= 0)
            return;
        assert(len <= TailIdleSize());
        _writer_idx += len;
    }
    // 确保可写空间足够 --- 整体空闲空间足够了就移动数据,否则进行扩容
    void EnsureWriteSpace(uint64_t len)
    {
        // 当len小于写入位置之后的空闲空间 直接进行写入
        if (len <= TailIdleSize())
        {
            return;
        }

        // len 小于总共的空闲空间
        else if (len <= TailIdleSize() + HeadIdleSize())
        {
            // 记录总共的数据
            uint64_t sz = ReadAbleSize();
            // 移动数据
            std::copy(ReadPos(), ReadPos() + sz, Begin());
            // 更新写入读取位置
            _reader_idx = 0;
            _writer_idx = sz;
        }
        // 需要扩容
        else
        {
            // 在写入位置之后扩充len个大小
            _buffer.resize(_writer_idx + len);
        }
    }
    // ----------写入数据------------
    void Write(const void *data, uint64_t len)
    {
        // 写入的数据不能超过可写的总空间
        // 确保可以正常写入
        EnsureWriteSpace(len);
        // 进行拷贝
        const char *d = reinterpret_cast<const char *>(data);
        std::copy(d, d + len, WritePos());
    }
    // 写入Buffer
    void WriteBuffer(Buffer &buffer)
    {
        // 直接调用Write
        return Write(buffer.ReadPos(), buffer.ReadAbleSize());
    }
    // 写入字符串
    void WriteString(const std::string &str)
    {
        // 直接调用Write
        return Write(&str[0], str.size());
    }
    // 写入 + 偏移
    void WriteAndPush(const void *data, uint64_t len)
    {
        if (len <= 0)
            return;
        // 进行写入
        Write(data, len);
        // 更新写入偏移量
        MoveWriteOffset(len);
    }
    void WriteStringAndPush(const std::string &str)
    {

        // 直接调用Write
        WriteString(str);
        LOG(DEBUG, "%s\n", WritePos());
        // 更新偏移量
        MoveWriteOffset(str.size());
        LOG(DEBUG, "当前可读数据大小:%ld\n", ReadAbleSize());
    }
    void WriteBufferAndPush(Buffer &buffer)
    {
        // 直接调用Write
        Write(buffer.ReadPos(), buffer.ReadAbleSize());
        MoveWriteOffset(buffer.ReadAbleSize());
    }

    //---------读取数据----------
    void Read(void *buf, uint64_t len)
    {
        // len 必须小于可读数据大小
        assert(len <= ReadAbleSize());
        // 进行拷贝
        std::copy(ReadPos(), ReadPos() + len, (char *)buf);
    }
    // 读取字符串
    std::string ReadAsString(uint64_t len)
    {
        // len 必须小于可读数据大小
        assert(len <= ReadAbleSize());
        std::string str;
        str.resize(len);
        // 直接调用Read
        Read(&str[0], len);
        return str;
    }
    // 读取 + Pop
    void ReadAndPop(void *buf, uint64_t len)
    {
        // 进行读取
        Read(buf, len);
        // 更新读取偏移量
        MoveReadOffset(len);
    }
    std::string ReadAsStringAndPop(uint64_t len)
    {
        // len 必须小于可读数据大小
        assert(len <= ReadAbleSize());
        // 进行读取
        std::string str = ReadAsString(len);
        // 更新偏移量
        MoveReadOffset(len);
        return str;
    }

    //-----------读取一行数据--------------
    char *FindCRLF()
    {
        return (char *)memchr(ReadPos(), '\n', ReadAbleSize());
    }
    std::string GetLine()
    {
        // 先寻找换行符
        char *pos = FindCRLF();
        if (pos == nullptr)
            return "";
        // 读取要带走'\n'
        std::string str = ReadAsString(pos - ReadPos() + 1);
        return str;
    }
    std::string GetLineAndPop()
    {
        std::string str = GetLine();
        MoveReadOffset(str.size());
        return str;
    }

    // 清除数据
    void Clear()
    {
        // 偏移量归零即可
        _reader_idx = 0;
        _writer_idx = 0;
    }
};

2.2 通用类型 Any类

每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合度,这个协议接收解析上下文就不能有明显的协议倾向,它可以是任意协议的上下文信息,因此就需要⼀个通⽤的类型来保存各种不同的数据结构

  1. 一个连接必须拥有一个请求接收与解析的上下文!
  2. 上下文的类型或者结构不能固定!因为服务器的协议支持的协议很多,不同的协议,可能都有不同的上下文结构!

所以必须拥有一个容器,能够保存各种不同的类型!那么就要实现一个any类

  1. 假如使用模版类方法,那么实例化对象的时候一定要指明容器保存的数据类型!而我们需要的是any可以接收任意类型Any a ; a = 10 ; a = "abc"!
  2. 但是可以嵌套一下,在Any类中设计一个类,专门用于保存各种类型的数据,而Any类保存的是固定类的对象。
  3. 对于这个固定类依旧不能使用模版。但这里这里可以采用多态,设计一个子类,这是一个模版类。
    这样可以通过父类指针读取子类数据!
  4. Any类中,保存的是holder类的指针,当Any类需要保存一个数据时,只需要提供placeholder子类实例化一个
    特定类型的对象出来,让子类对象保存数据!
// 通用类型 Any类
class Any
{
private:
    class holder
    {
    public:
        holder() {}
        virtual ~holder() {};
        virtual const std::type_info &type() = 0;
        virtual holder *clone() = 0;
    };
    template <class T>
    class placeholder : public holder
    {
    public:
        placeholder(const T &val) : _val(val) {}

        virtual const std::type_info &type() override
        {
            return typeid(T);
        }
        virtual holder *clone() override
        {
            return new placeholder(_val);
        }

        T _val;
    };
    // 通过这个父类指针访问子类的成员
    holder *_content = nullptr;

public:
    Any()
    {
    }
    template <class T>
    // 拷贝构造
    Any(const T &val) : _content(new placeholder<T>(val))
    {
    }
    // 拷贝构造
    Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}
    // 交换数据
    Any &swap(Any &other)
    {
        std::swap(_content, other._content);
        return *this;
    }

    // 赋值重载
    Any &operator=(Any &other)
    {
        // 根据 other建立一个临时对象 ,进行资源的交换
        Any(other).swap(*this);
        return *this;
    }
    template <class T>
    Any &operator=(const T &val)
    {
        // 根据 val 建立一个临时对象,进行资源的交换
        Any(val).swap(*this);
        return *this;
    }
    template <class T>
    T *Get()
    {
        if (typeid(T) != _content->type())
            return nullptr;
        return &((reinterpret_cast<placeholder<T> *>(_content))->_val);
    }

    ~Any()
    {
        delete _content;
    }
};

2.3 套接字 Socket模块

Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。是连接模块Connection 与监听模块Accpter的基础!对于套接字的操作我们已经在熟悉不过了:

  1. 构造函数 析构函数
  2. 创建套接字
    bool Create()
  3. 绑定地址信息
  4. 开始监听
  5. 向服务器发起连接
  6. 获取新连接
  7. 接收数据
  8. 发送数据
  9. 关闭套接字
  10. 创建一个服务端连接:首先创建套接字,然后将进程绑定地址信息,开启监听状态,注意设置为非阻塞读取(为了配合多路转接IO,不需要IO接口进行等待,一切等待都由多路转接负责);同时启动地址重用 ,保护客户端
  11. 创建一个客户端连接:创建套接字,连接服务器。
  12. 设置套接字选项 — 开启地址端口重用
  13. 设置套接字阻塞属性 — 设置为非阻塞读取
// 套接字Socket类
class Socket
{
private:
    int _sockfd; // 套接字文件描述符

private:
    // 创建套接字
    bool Create()
    {
        // int socket(int domain, int type, int protocol);
        // IPV4     数据流IO
        _sockfd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (_sockfd < 0)
        {
            LOG(ERROR, "create socket failed!\n");
            return false;
        }
        LOG(INFO, "Sockfd:%d create success \n", _sockfd);
        return true;
    }
    // 绑定地址信息
    bool bind(const std::string &ip, uint16_t port)
    {
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;                    // 使用IPv4版本地址
        addr.sin_port = htons(port);                  // 主机端口转网络端口
        addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
        // 进行绑定
        socklen_t len = sizeof(struct sockaddr_in);
        int n = ::bind(_sockfd, (struct sockaddr *)&addr, len);
        if (n < 0)
        {
            LOG(ERROR, "bind failed!\n");
            return false;
        }
        return true;
    }
    // 建立监听套接字
    bool Listen(int backlog = MAX_LIETENSIZE)
    {
        //                        全连接队列的大小
        int n = ::listen(_sockfd, backlog);
        if (n < 0)
        {
            LOG(ERROR, "listen failed!\n");
            return false;
        }
        return true;
    }
    // 向服务器发起连接
    bool Connect(const std::string &ip, uint16_t port)
    {
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;                    // 使用IPv4版本地址
        addr.sin_port = htons(port);                  // 主机端口转网络端口
        addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
        // 进行绑定
        socklen_t len = sizeof(struct sockaddr_in);
        int n = ::connect(_sockfd, (struct sockaddr *)&addr, len);
        if (n < 0)
        {
            LOG(ERROR, "connect failed!\n");
            return false;
        }
        return true;
    }

    // 非阻塞启动地址重用
    void ReuseAddress(uint16_t port, const std::string &ip)
    {
        int val = 1;
        socklen_t len = sizeof(int);
        // 将端口号设置为可重用
        if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, len) == -1)
        {
            throw std::runtime_error("Failed to set SO_REUSEPORT");
        }

        // 将IP地址设置为可重用
        if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, len) == -1)
        {
            throw std::runtime_error("Failed to set SO_REUSEADDR");
        }
    }
    void NonBlock()
    {
        int flag = ::fcntl(_sockfd, F_GETFL, 0);
        ::fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
    }

public:
    // 构造函数
    Socket() : _sockfd(-1) {};
    Socket(int sockfd) : _sockfd(sockfd) {}
    // 析构函数
    ~Socket() { Close(); }
    // 返回套接字文件描述符
    int Sockfd() { return _sockfd; }

    // 接收数据
    ssize_t Recv(void *buf, size_t len, int flag = 0)
    {
        ssize_t n = ::recv(_sockfd, buf, len, flag);
        if (n <= 0)
        {
            // 被信号中断         没有数据了- 非阻塞读取完毕
            if (errno == EINTR || errno == EAGAIN)
                return 0; // 没有读取到数据

            LOG(ERROR, "recv failed!\n");
            return -1;
        }
        return n; // 实际发送的长度
    }
    ssize_t NonBlockRecv(void *buf, size_t len)
    {
        //       MSG_DONTWAIT  非阻塞式读取
        if (len <= 0)
            return 0;
        return Recv(buf, len, MSG_DONTWAIT);
    }
    // 发送数据
    ssize_t Send(void *buf, size_t len, int flag = 0)
    {
        if (len <= 0)
            return 0;
        ssize_t n = ::send(_sockfd, buf, len, flag);
        if (n <= 0)
        {
            // 被信号中断         没有数据了- 非阻塞读取完毕
            if (errno == EINTR || errno == EAGAIN)
                return 0; // 没有读取到数据
            LOG(ERROR, "send failed!\n");
            return -1;
        }
        LOG(DEBUG, "Send :%s\n", (char *)buf);
        return n; // 实际发送的数据大小
    }
    ssize_t NonBlockSend(void *buf, size_t len)
    {
        //       MSG_DONTWAIT  非阻塞式发送
        if (len <= 0)
            return 0;
        return Send(buf, len, MSG_DONTWAIT);
    }

    // 创建服务端套接字
    bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool flag = 0)
    {

        // 创建套接字
        if (Create() == false)
            return false;
        // 设置为非阻塞
        if (flag)
            NonBlock();
        // 将地址与端口设置为可重用
        ReuseAddress(port, ip);
        // 绑定地址信息
        if (bind(ip, port) == false)
            return false;
        // 进行监听
        if (Listen() == false)
            return false;

        return true;
    }
    // 创建客户端套接字
    bool CreateClient(uint16_t port, const std::string &ip)
    {
        // 创建套接字
        if (Create() == false)
            return false;
        // 连接服务器
        if (Connect(ip, port) == false)
            return false;
        return true;
    }
    // 服务器获取连接
    int Accept()
    {
        int newfd = ::accept(_sockfd, nullptr, nullptr);
        if (newfd < 0)
        {
            LOG(ERROR, "accept failed!\n");
            return -1;
        }
        return newfd;
    }
    //  关闭套接字
    void Close()
    {
        LOG(INFO, "Close sockfd: %d\n", _sockfd);
        if (_sockfd != -1)
        {
            ::close(_sockfd);
        }
    }
};

原文地址:https://blog.csdn.net/JLX_1/article/details/143987150

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!