【项目日记】仿mudou的高并发服务器 --- 实现缓冲区模块,通用类型Any模块,套接字模块
✨✨✨项目地址在这里 ✨✨✨
✨✨✨https://gitee.com/penggli_2_0/TcpServer✨✨✨
1 主从Reactor模型
这个项目的目标是实现一个可以高效处理请求的服务器,那么对于这样的一个服务器要如何实现呢?
这里采取主从Reactor模型的方案:
- 主Reactor模型负责对监听套接字进行管理,进行获取连接操作。
- 从属Reactor负责对连接的数据进行处理,并且为了保证线程安全,每一个从属Reactor都要绑定一个线程,这个连接的任务都在这个线程中完成。
简单来说就是这样的一个模型:
其中管理连接的对象是Connection对象,这个类是高并发服务器的核心部分,这里包含了对套接字的处理,事件等待等一系列操作!而Connection对象中对于这些操作的管理也要通过其他的对象来进行!
主要的工作就是搭建起主从Reactor模型,但是这个模型不是一下子就可以写出来的。为了实现主从Reactor模型,我们先需要实现一些基础类,对基础功能进封装,然后对这些功能进行整合,最终实现主从Reactor模型!
2 基础功能封装
2.1 缓冲区 Buffer模块
Buffer模块是⼀个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能:
- 需要支持字符串读取/写入
- 需要支持
char*
缓冲区读取/写入 - 需要正常按行读取 — 方便解析http请求
Buffer模块成员变量很简单:
vector<char>
容器_buffer: 对内存空间进行管理uint64_t _reader_idx
读偏移:进行读取位置在_buffer容器中的偏移量,即读取的起始位置。uint64_t _writer_idx
写偏移:下一次写入位置在_buffer容器中的偏移量,即写入的起始位置。
接下来实现一下基础功能:
- 构造函数:初始化读/写偏移为0 ,容器_buffer初始化一个大小 BUFFER_DEFAULT_SIZE。
- 获取当前写入起始地址:_buffer空间的起始地址加入写偏移量即写入起始地址。
- 获取当前读取起始地址: _buffer空间的起始地址加入读偏移量即读取起始地址。
- 获取缓冲区末尾空闲空间大小:写偏移之后的空闲空间 ,总体空间大小减去写偏移就是写偏移之后的空间大小。
- 获取缓冲区起始空闲空间大小:读偏移之前的空闲空间,其实就是读偏移的大小。
- 获取可读数据大小:写偏移减去读偏移就就之间可读空间的大小!
- 读/写偏移向后移动:
* 先根据len
判断是否小于可读数据大小len
必须小于可读数据大小,然后移动读偏移。
* 向后移动必须小于当前后边的空闲空间大小,写入数据必须小于缓冲区剩余空间大小,不足就进行扩容! - 确保可写空间足够 :
* 末尾空闲空间足够 直接返回。
* 末尾空间不足,但算上起始位置的空闲空间大小足够 ,将数据移动到起始位置。
* 如果总空间不足 ,进行扩容,扩容到足够空间即可 - 写入数据:首先保证有足够空间,然后将数据数据拷贝进去。可以继续设计出针对string的写入、针对Buffer的写入以及
写入 + 压入数据
- 读取数据:首先读取大小
len
必须小于可读取数据大小,然后拷贝数据出来。同样可以设计出针对string的读取、针对Buffer的读取以及读取+弹出数据
。 - 清空缓冲区:将偏移量归零即可!
- 读取一行数据:先找到换行符,然后进行读取。
// 缓冲区Buffer类
class Buffer
{
private:
std::vector<char> _buffer;
uint64_t _reader_idx;
uint64_t _writer_idx;
private:
char *Begin() { return &*_buffer.begin(); }
public:
Buffer() : _buffer(BUFFER_DEFAULT_SIZE), _reader_idx(0), _writer_idx(0) {}
// 获取当前写入起始地址
char *WritePos() { return Begin() + _writer_idx; }
// 获取当前读取起始地址
char *ReadPos() { return Begin() + _reader_idx; }
// 获取读取位置之前的空闲空间
uint64_t HeadIdleSize() { return _reader_idx; }
// 获取写入位置之后的空闲空间
uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }
// 获取可读数据大小
uint64_t ReadAbleSize() { return _writer_idx - _reader_idx; }
// 读/写偏移移动
void MoveReadOffset(uint64_t len)
{
if (len <= 0)
return;
assert(len <= ReadAbleSize());
_reader_idx += len;
}
void MoveWriteOffset(uint64_t len)
{
if (len <= 0)
return;
assert(len <= TailIdleSize());
_writer_idx += len;
}
// 确保可写空间足够 --- 整体空闲空间足够了就移动数据,否则进行扩容
void EnsureWriteSpace(uint64_t len)
{
// 当len小于写入位置之后的空闲空间 直接进行写入
if (len <= TailIdleSize())
{
return;
}
// len 小于总共的空闲空间
else if (len <= TailIdleSize() + HeadIdleSize())
{
// 记录总共的数据
uint64_t sz = ReadAbleSize();
// 移动数据
std::copy(ReadPos(), ReadPos() + sz, Begin());
// 更新写入读取位置
_reader_idx = 0;
_writer_idx = sz;
}
// 需要扩容
else
{
// 在写入位置之后扩充len个大小
_buffer.resize(_writer_idx + len);
}
}
// ----------写入数据------------
void Write(const void *data, uint64_t len)
{
// 写入的数据不能超过可写的总空间
// 确保可以正常写入
EnsureWriteSpace(len);
// 进行拷贝
const char *d = reinterpret_cast<const char *>(data);
std::copy(d, d + len, WritePos());
}
// 写入Buffer
void WriteBuffer(Buffer &buffer)
{
// 直接调用Write
return Write(buffer.ReadPos(), buffer.ReadAbleSize());
}
// 写入字符串
void WriteString(const std::string &str)
{
// 直接调用Write
return Write(&str[0], str.size());
}
// 写入 + 偏移
void WriteAndPush(const void *data, uint64_t len)
{
if (len <= 0)
return;
// 进行写入
Write(data, len);
// 更新写入偏移量
MoveWriteOffset(len);
}
void WriteStringAndPush(const std::string &str)
{
// 直接调用Write
WriteString(str);
LOG(DEBUG, "%s\n", WritePos());
// 更新偏移量
MoveWriteOffset(str.size());
LOG(DEBUG, "当前可读数据大小:%ld\n", ReadAbleSize());
}
void WriteBufferAndPush(Buffer &buffer)
{
// 直接调用Write
Write(buffer.ReadPos(), buffer.ReadAbleSize());
MoveWriteOffset(buffer.ReadAbleSize());
}
//---------读取数据----------
void Read(void *buf, uint64_t len)
{
// len 必须小于可读数据大小
assert(len <= ReadAbleSize());
// 进行拷贝
std::copy(ReadPos(), ReadPos() + len, (char *)buf);
}
// 读取字符串
std::string ReadAsString(uint64_t len)
{
// len 必须小于可读数据大小
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
// 直接调用Read
Read(&str[0], len);
return str;
}
// 读取 + Pop
void ReadAndPop(void *buf, uint64_t len)
{
// 进行读取
Read(buf, len);
// 更新读取偏移量
MoveReadOffset(len);
}
std::string ReadAsStringAndPop(uint64_t len)
{
// len 必须小于可读数据大小
assert(len <= ReadAbleSize());
// 进行读取
std::string str = ReadAsString(len);
// 更新偏移量
MoveReadOffset(len);
return str;
}
//-----------读取一行数据--------------
char *FindCRLF()
{
return (char *)memchr(ReadPos(), '\n', ReadAbleSize());
}
std::string GetLine()
{
// 先寻找换行符
char *pos = FindCRLF();
if (pos == nullptr)
return "";
// 读取要带走'\n'
std::string str = ReadAsString(pos - ReadPos() + 1);
return str;
}
std::string GetLineAndPop()
{
std::string str = GetLine();
MoveReadOffset(str.size());
return str;
}
// 清除数据
void Clear()
{
// 偏移量归零即可
_reader_idx = 0;
_writer_idx = 0;
}
};
2.2 通用类型 Any类
每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合度,这个协议接收解析上下文就不能有明显的协议倾向,它可以是任意协议的上下文信息,因此就需要⼀个通⽤的类型来保存各种不同的数据结构:
- 一个连接必须拥有一个请求接收与解析的上下文!
- 上下文的类型或者结构不能固定!因为服务器的协议支持的协议很多,不同的协议,可能都有不同的上下文结构!
所以必须拥有一个容器,能够保存各种不同的类型!那么就要实现一个any类
- 假如使用模版类方法,那么实例化对象的时候一定要指明容器保存的数据类型!而我们需要的是any可以接收任意类型
Any a ; a = 10 ; a = "abc"
! - 但是可以嵌套一下,在Any类中设计一个类,专门用于保存各种类型的数据,而Any类保存的是固定类的对象。
- 对于这个固定类依旧不能使用模版。但这里这里可以采用多态,设计一个子类,这是一个模版类。
这样可以通过父类指针读取子类数据! - Any类中,保存的是holder类的指针,当Any类需要保存一个数据时,只需要提供placeholder子类实例化一个
特定类型的对象出来,让子类对象保存数据!
// 通用类型 Any类
class Any
{
private:
class holder
{
public:
holder() {}
virtual ~holder() {};
virtual const std::type_info &type() = 0;
virtual holder *clone() = 0;
};
template <class T>
class placeholder : public holder
{
public:
placeholder(const T &val) : _val(val) {}
virtual const std::type_info &type() override
{
return typeid(T);
}
virtual holder *clone() override
{
return new placeholder(_val);
}
T _val;
};
// 通过这个父类指针访问子类的成员
holder *_content = nullptr;
public:
Any()
{
}
template <class T>
// 拷贝构造
Any(const T &val) : _content(new placeholder<T>(val))
{
}
// 拷贝构造
Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}
// 交换数据
Any &swap(Any &other)
{
std::swap(_content, other._content);
return *this;
}
// 赋值重载
Any &operator=(Any &other)
{
// 根据 other建立一个临时对象 ,进行资源的交换
Any(other).swap(*this);
return *this;
}
template <class T>
Any &operator=(const T &val)
{
// 根据 val 建立一个临时对象,进行资源的交换
Any(val).swap(*this);
return *this;
}
template <class T>
T *Get()
{
if (typeid(T) != _content->type())
return nullptr;
return &((reinterpret_cast<placeholder<T> *>(_content))->_val);
}
~Any()
{
delete _content;
}
};
2.3 套接字 Socket模块
Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。是连接模块Connection 与监听模块Accpter的基础!对于套接字的操作我们已经在熟悉不过了:
- 构造函数 析构函数
- 创建套接字
bool Create() - 绑定地址信息
- 开始监听
- 向服务器发起连接
- 获取新连接
- 接收数据
- 发送数据
- 关闭套接字
- 创建一个服务端连接:首先创建套接字,然后将进程绑定地址信息,开启监听状态,注意设置为非阻塞读取(为了配合多路转接IO,不需要IO接口进行等待,一切等待都由多路转接负责);同时启动地址重用 ,保护客户端
- 创建一个客户端连接:创建套接字,连接服务器。
- 设置套接字选项 — 开启地址端口重用
- 设置套接字阻塞属性 — 设置为非阻塞读取
// 套接字Socket类
class Socket
{
private:
int _sockfd; // 套接字文件描述符
private:
// 创建套接字
bool Create()
{
// int socket(int domain, int type, int protocol);
// IPV4 数据流IO
_sockfd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0)
{
LOG(ERROR, "create socket failed!\n");
return false;
}
LOG(INFO, "Sockfd:%d create success \n", _sockfd);
return true;
}
// 绑定地址信息
bool bind(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET; // 使用IPv4版本地址
addr.sin_port = htons(port); // 主机端口转网络端口
addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
// 进行绑定
socklen_t len = sizeof(struct sockaddr_in);
int n = ::bind(_sockfd, (struct sockaddr *)&addr, len);
if (n < 0)
{
LOG(ERROR, "bind failed!\n");
return false;
}
return true;
}
// 建立监听套接字
bool Listen(int backlog = MAX_LIETENSIZE)
{
// 全连接队列的大小
int n = ::listen(_sockfd, backlog);
if (n < 0)
{
LOG(ERROR, "listen failed!\n");
return false;
}
return true;
}
// 向服务器发起连接
bool Connect(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET; // 使用IPv4版本地址
addr.sin_port = htons(port); // 主机端口转网络端口
addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
// 进行绑定
socklen_t len = sizeof(struct sockaddr_in);
int n = ::connect(_sockfd, (struct sockaddr *)&addr, len);
if (n < 0)
{
LOG(ERROR, "connect failed!\n");
return false;
}
return true;
}
// 非阻塞启动地址重用
void ReuseAddress(uint16_t port, const std::string &ip)
{
int val = 1;
socklen_t len = sizeof(int);
// 将端口号设置为可重用
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, len) == -1)
{
throw std::runtime_error("Failed to set SO_REUSEPORT");
}
// 将IP地址设置为可重用
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, len) == -1)
{
throw std::runtime_error("Failed to set SO_REUSEADDR");
}
}
void NonBlock()
{
int flag = ::fcntl(_sockfd, F_GETFL, 0);
::fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
public:
// 构造函数
Socket() : _sockfd(-1) {};
Socket(int sockfd) : _sockfd(sockfd) {}
// 析构函数
~Socket() { Close(); }
// 返回套接字文件描述符
int Sockfd() { return _sockfd; }
// 接收数据
ssize_t Recv(void *buf, size_t len, int flag = 0)
{
ssize_t n = ::recv(_sockfd, buf, len, flag);
if (n <= 0)
{
// 被信号中断 没有数据了- 非阻塞读取完毕
if (errno == EINTR || errno == EAGAIN)
return 0; // 没有读取到数据
LOG(ERROR, "recv failed!\n");
return -1;
}
return n; // 实际发送的长度
}
ssize_t NonBlockRecv(void *buf, size_t len)
{
// MSG_DONTWAIT 非阻塞式读取
if (len <= 0)
return 0;
return Recv(buf, len, MSG_DONTWAIT);
}
// 发送数据
ssize_t Send(void *buf, size_t len, int flag = 0)
{
if (len <= 0)
return 0;
ssize_t n = ::send(_sockfd, buf, len, flag);
if (n <= 0)
{
// 被信号中断 没有数据了- 非阻塞读取完毕
if (errno == EINTR || errno == EAGAIN)
return 0; // 没有读取到数据
LOG(ERROR, "send failed!\n");
return -1;
}
LOG(DEBUG, "Send :%s\n", (char *)buf);
return n; // 实际发送的数据大小
}
ssize_t NonBlockSend(void *buf, size_t len)
{
// MSG_DONTWAIT 非阻塞式发送
if (len <= 0)
return 0;
return Send(buf, len, MSG_DONTWAIT);
}
// 创建服务端套接字
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool flag = 0)
{
// 创建套接字
if (Create() == false)
return false;
// 设置为非阻塞
if (flag)
NonBlock();
// 将地址与端口设置为可重用
ReuseAddress(port, ip);
// 绑定地址信息
if (bind(ip, port) == false)
return false;
// 进行监听
if (Listen() == false)
return false;
return true;
}
// 创建客户端套接字
bool CreateClient(uint16_t port, const std::string &ip)
{
// 创建套接字
if (Create() == false)
return false;
// 连接服务器
if (Connect(ip, port) == false)
return false;
return true;
}
// 服务器获取连接
int Accept()
{
int newfd = ::accept(_sockfd, nullptr, nullptr);
if (newfd < 0)
{
LOG(ERROR, "accept failed!\n");
return -1;
}
return newfd;
}
// 关闭套接字
void Close()
{
LOG(INFO, "Close sockfd: %d\n", _sockfd);
if (_sockfd != -1)
{
::close(_sockfd);
}
}
};
原文地址:https://blog.csdn.net/JLX_1/article/details/143987150
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!