9、核心:共享内存的基础结构和动态增长实现(续)
我的github:codetoys,所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。
这些代码大部分以Linux为目标但部分代码是纯C++的,可以在任何平台上使用。
四、关于meta
meta对于数据格式识别很重要,因为对于跨主机的数据传输,字节序、数据长度、版本都是很重要的,meta里面存储了所有这些信息。
程序连接到一个外来的数据时,必须校验格式,否则,一定会出大事。
不过呢,很多大厂啊(不是针对谁,你写了我就不是说你),写的协议里面连版本都没有的。
这个模板在meta里面把记录长度、版本都放进去了,meta能匹配说明数据差不多是对的,另外还有共享内存的名字,也会在连接时匹配。
这些是经验。
五、自动增长
自动增长的实现以添加数据为入口,添加数据首先会确保容量足够,STL里面叫做reserve。
容量不够就触发扩展操作,申请一块新的共享内存,添加到分块表里面。
代码比较复杂:
bool Reserve(T_SHM_SIZE _n, T_SHM_SIZE min_block_n = 0)
{
if (pHead->capacity >= _n)return true;
T_SHM_SIZE new_size = (pHead->capacity >= 1000 * 1000 * 10 ? pHead->capacity / 5 : pHead->capacity);//增加的容量,先设置为默认的比例
if (new_size < _n - pHead->capacity)new_size = _n - pHead->capacity;
if (new_size < 1)new_size = 1;
if (new_size < min_block_n)new_size = min_block_n;
if (1 == sizeof(T) && new_size > 8 && 0 != new_size % 8)
{
new_size = (new_size / 8 + 1) * 8;
thelog << "T=char记录数不为8的倍数,需修正为" << new_size << endi;
}
thelog << PI_N << " 容量 " << pHead->capacity << " 申请容量 " << _n << " 最小新块 " << min_block_n << " 实际申请 " << new_size << " 扩展后容量 " << pHead->capacity + new_size << endi;
if (!isPrivate)
{//共享内存扩展,附加一个共享内存块,不连续
if (NULL != GET_PP_LRU(PI_N) || name.substr(1, 3) == "LOG")
{
thelog << "LRU缓存和LOG*命名的共享内存" << ende;
return false;
}
#ifdef SHM_ALLOCATOR_USE_OLD_POINTER
if (SHM_NAME_SHMPOOL == name))
{
thelog << "老式共享内存池不可扩展" << ende;
return false;
}
#endif
if (pHead->vmaps.size >= T_ARRAY_VMAP_MAX_SIZE)
{
thelog << "共享内存扩展次数已经达到最大,不能扩展" << ende;
return false;
}
if (pHead->vmaps.size >= T_ARRAY_VMAP_MAX_SIZE - 5)
{
thelog << "注意:共享内存扩展次数已经达到 " << pHead->vmaps.size << " ,最大扩展次数为 " << T_ARRAY_VMAP_MAX_SIZE << " ,请整理共享内存并修正初始大小设置" << endw;
}
int new_shmid;
while ((new_shmid = CShmMan::CreatePrivateSHM(sizeof(T) * new_size)) < 0)
{
thelog << "创建共享内存失败 申请的大小 " << sizeof(T) * new_size << " 通常是因为系统内存中没有足够的连续物理内存 或者超过了系统限制" << endw;
new_size /= 2;
if (new_size > 1 && new_size >= min_block_n)
{
thelog << "尝试申请更小的大小 " << sizeof(T) * new_size << endi;
continue;
}
else
{
thelog << "创建共享内存失败.不能申请所需的最小共享内存" << ende;
return false;
}
}
thelog << "创建新共享内存成功,id = " << new_shmid << endi;
char* p = CShmMan::ConnectByID(new_shmid, false);
if (NULL == p)
{
thelog << PI_N << " 连接新共享内存失败 错误信息:" << strerror(errno) << ende;
return false;
}
//设置地址映射表
GET_SHM_PRIVATE_DATA(PI_N).shm_addr_map[pHead->vmaps.size].first = new_shmid;
GET_SHM_PRIVATE_DATA(PI_N).shm_addr_map[pHead->vmaps.size].second = p;
++GET_SHM_PRIVATE_DATA(PI_N).addr_map_size;
//设置分块影射表
struct_T_ARRAY_VMAP* pvmap = &pHead->vmaps.m_vmaps[pHead->vmaps.size];
pvmap->shm_id = new_shmid;
pvmap->handle_begin = pHead->capacity;
pvmap->handle_end = pvmap->handle_begin + new_size;
++pHead->vmaps.size;
//设置新的总容量
pHead->capacity += new_size;
}
else
{//私有内存扩展,保持为一个连续的块
char* tmp = new char[_CalcShmSize(pHead->capacity + new_size)];
if (NULL == tmp)
{
return false;
}
memcpy(tmp, pHead, _CalcShmSize(pHead->capacity));
delete[](char*)pHead;
pHead = (array_head*)tmp;
pData = (T*)(tmp + sizeof(array_head));
pHead->capacity = pHead->capacity + new_size;
_InitFirstSPD();
}
string str;
thelog << ReportHead(str) << endi;
return true;
}
代码比较复杂是因为考虑了众多情形:
- 每次增长的大小是经过了计算的,不能一个一个增长(系统不支持这么多共享内存块)
- 申请的长度修正为8的倍数(能避免一些奇怪的问题,如果你有一些奇怪的想法)
- 如果申请失败,尝试申请更小的大小
- 提供一些告警
六、从文件加载
整个共享内存可以保存为一个磁盘文件,然后快速恢复,也可以传输给别的主机使用(字节序!因为我用的主机字节序都一样,所以没有处理字节序)。
加载的过程相当繁琐,因为要保证格式正确。(保存的代码则简单多了,无脑写即可)
代码如下:
bool _LoadFromFile(bool toShm, char const* file)
{
if (NULL != this->pHead)
{
thelog << "数据指针不为空,不能用已经含有数据的对象做数据加载" << ende;
return false;
}
if (toShm)
{
_DestoryShm();
}
bool isNewshm = true;//是否创建了新共享内存,若存在的空间够大就不创建
ifstream f;
f.open(file);
if (!f.good())
{
thelog << "打开文件失败 " << file << ende;
return false;
}
f.seekg(0, ios::end);
long len = f.tellg();
thelog << "文件长度 " << len << endi;
//先读取元数据分析,再读取文件头,最后读取数据区
long file_headsize;//文件实际的头长度,版本不同可能不同
CMeta file_meta;//文件的meta
f.seekg(0, ios::beg);
f.read((char*)&file_meta, sizeof(CMeta));
if (!f.good() || f.gcount() != sizeof(CMeta))
{
thelog << "读文件失败 " << file << ende;
f.close();
return false;
}
file_headsize = file_meta.GetInt(0);
CMeta tmp_meta;
string msg;
_makemeta(tmp_meta, version);
if (!tmp_meta.Compare(file_meta, msg))
{
thelog << "元数据格式不匹配" << endi;
if (!file_meta.CheckGuid((signed char const*)&tmp_meta))
{
thelog << "文件的元数据GUID错误" << ende;
f.close();
return false;
}
if (!file_meta.CheckSys())
{
thelog << "文件的元数据格式信息错误,可能不是64位系统或字节序不同" << ende;
f.close();
return false;
}
if (file_headsize == sizeof(array_head))
{
}
else if (file_headsize == sizeof(array_head_old))
{
thelog << "识别为旧版本数据" << endi;
}
else if (file_headsize == sizeof(array_head_new))
{
thelog << "识别为新版本数据" << endi;
}
else
{
thelog << "文件的元数据文件头长度错误," << file_headsize << " 预期 " << sizeof(array_head_new) << " 或" << sizeof(array_head_old) << ende;
f.close();
return false;
}
if (file_meta.GetInt(1) != sizeof(T_USER_HEAD))
{
thelog << "文件的元数据USER_HEAD长度错误," << file_meta.GetInt(1) << " 预期 " << sizeof(T_USER_HEAD) << ende;
f.close();
return false;
}
if (file_meta.GetInt(2) != sizeof(T))
{
thelog << "文件的元数据T长度错误," << file_meta.GetInt(2) << " 预期 " << sizeof(T) << ende;
f.close();
return false;
}
if (file_meta.GetInt(3) != version)
{
thelog << "文件的元数据version错误," << file_meta.GetInt(3) << " 预期 " << version << ende;
f.close();
return false;
}
}
array_head new_head;//实际使用的头结构,定义为新结构或旧结构,若长度不相同,必然是与文件不同的另一个结构
char file_head[file_headsize];//文件的头结构
f.seekg(0, ios::beg);
f.read(file_head, file_headsize);
if (!f.good() || f.gcount() != file_headsize)
{
thelog << "读文件失败 " << file << ende;
f.close();
return false;
}
if (file_headsize == sizeof(array_head))
{
memcpy((void*)&new_head, file_head, file_headsize);
}
else if (file_headsize == sizeof(array_head_old))
{
array_head_old* p = (array_head_old*)file_head;
_makemeta(new_head.meta, version);
new_head.name = p->name.c_str();
new_head.capacity = p->capacity;
new_head.size = p->size;
new_head.vmaps = p->vmaps;
new_head.____ = p->____;
new_head.userhead = p->userhead;
}
else if (file_headsize == sizeof(array_head_new))
{
array_head_new* p = (array_head_new*)file_head;
_makemeta(new_head.meta, version);
new_head.name = p->name.c_str();
new_head.capacity = p->capacity;
new_head.size = p->size;
new_head.vmaps = p->vmaps;
new_head.____ = p->____;
new_head.userhead = p->userhead;
}
else
{
thelog << "未识别的头长度" << ende;
f.close();
return false;
}
T_SHM_SIZE r_size = -1;//数据库配置的大小
//获取配置的大小
ShmRegInfo tmpreg(GetShmSysOfName(name.c_str()), name.c_str(), PART);
if (tmpreg.GetConfigSize(r_size))
{
thelog << GetShmSysOfName(name.c_str()) << " " << name << " 配置的最大记录数 " << r_size << endi;
//如果数据库配置了大小并且比文件大则使用数据库配置的大小
if (_CalcShmSize(r_size) > len)
{
thelog << "配置的大小比文件大,使用配置的大小" << endi;
}
else
{
thelog << "配置的大小比文件小,使用文件的大小" << endi;
}
}
else
{
r_size = -1;
thelog << GetShmSysOfName(name.c_str()) << " " << name << " 未配置的最大记录数,使用文件的大小" << endi;
}
//创建
if (toShm)
{
char* p = _CreateShmIfNeed((len > _CalcShmSize(r_size) ? len : _CalcShmSize(r_size)), isNewshm);
if (NULL == p)
{
thelog << "无法获得共享内存" << ende;
return false;
}
this->pHead = (array_head*)p;
}
else
{
this->pHead = (array_head*)new char[len > _CalcShmSize(r_size) ? len : _CalcShmSize(r_size)];
if (NULL == this->pHead)
{
thelog << "内存不足" << ende;
f.close();
return false;
}
}
memcpy((void*)this->pHead, &new_head, sizeof(array_head));
this->pData = (T*)(((char*)this->pHead) + sizeof(array_head));
this->isPrivate = (!toShm);
long datalen = len - file_headsize;
long count = 100 * 1024 * 1024;
long i = 0;
f.seekg(file_headsize, ios::beg);
for (; i < datalen; i += count)
{
count = (i + count <= datalen ? count : datalen - i);
f.read(((char*)this->pData) + i, count);
if (!f.good() || f.gcount() != count)
{
thelog << "读文件失败 " << file << ende;
f.close();
return false;
}
}
f.close();
thelog << "读取完成 " << i << endi;
//检查内容
if (!_CheckMeta() || pHead->name != name)
{
thelog << "数据格式或名称不匹配" << ende;
return false;
}
if (datalen != (long)(pHead->capacity * sizeof(T)))
{
thelog << "数据大小 " << datalen << " 与容量计算 " << pHead->capacity * sizeof(T) + sizeof(array_head) << " 不匹配" << ende;
return false;
}
if (r_size > 0 && pHead->capacity < r_size)
{
pHead->capacity = r_size;//修正实际的容量
}
if (toShm)
{
if (isNewshm)
{
//注册共享内存,注册的是实际大小,可能大于申请的大小
ShmRegInfo tmpreg(GetShmSysOfName(name.c_str()), name.c_str(), PART);
tmpreg.shmid = shmid;
if (!CShmMan::GetState(shmid, tmpreg.segsz, tmpreg.ctime))
{
thelog << "获取共享内存失败" << ende;
return false;
}
if (!tmpreg.SaveRegToDb())return false;
}
}
_InitFirstSPD();
return true;
}
功夫都在细节处。
(这里是结束)
原文地址:https://blog.csdn.net/2301_77171572/article/details/140514402
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!