自学内容网 自学内容网

【文件锁】多进程线程安全访问文件demo

组合文件锁+共享锁,并RAII 化,保证文件的跨进程线程读写安全。
demo模拟使用多个进程,每个进程包含多个线程对文件进行读写测试。
代码调用开源json库,需要下载到调试机器,编译时手动指定:

g++ -std=c++17 -pthread dbug.cpp  -I/root/json/include 
#include <cstddef>
#include <cstdlib>
#include <unistd.h>
#include <fstream>
#include <sys/stat.h>
#include <filesystem>
#include <iomanip>

#include <sys/wait.h>
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <unordered_map>
#include <memory>
#include <string>
#include <fcntl.h>
#include <sys/file.h> // 包含文件锁相关的头文件
#include <sys/types.h>
#include <sys/wait.h>
#include <mutex>  // 包含mutex头文件
#include <shared_mutex>
#include <nlohmann/json.hpp>  // 引入 JSON 库
#include <stdexcept> // 包含标准异常类

// #include <memory>
/* 写json文件 */
using json = nlohmann::json;
namespace fs = std::filesystem;
void writeToJsonFile(const std::string &filename, const json &data, int maxRetries = 3,
                     int retryDelayMilliseconds = 100);
/* 读json文件 */
json readJsonFromFile(const std::string &filename, int maxRetries = 3,
                      int retryDelayMs = 100);

class FileLock {
//创建双重锁,文件锁实现进程同步,共享锁实现线程同步,设置为配套获取与释放
private:
    std::string fileName;
    int fileDesc;
    mutable std::shared_mutex rwLock; // 实现线程同步
public:
    FileLock(const std::string& file) : fileName(file) {
        // 打开文件(如果文件不存在则创建)
        fileDesc = open(fileName.c_str(), O_CREAT | O_RDWR, 0666);
        if (fileDesc == -1) {
            throw std::runtime_error("Failed to open file for locking: " + fileName);
        }
    }

    ~FileLock() {
        if (fileDesc != -1) {
            close(fileDesc);  // 关闭文件描述符
        }
    }

    // 禁止复制构造函数和赋值运算符
    FileLock(const FileLock&) = delete;
    FileLock& operator=(const FileLock&) = delete;

    // 锁定文件进行读取
    void lockRead() {
        rwLock.lock_shared(); // 获取共享锁(读锁)
        if (flock(fileDesc, LOCK_SH) == -1) { // 获取共享锁(读锁)
            // rwLock.unlock_shared(); // 释放共享锁
            throw std::runtime_error("Failed to lock file for reading: " + fileName);
        }
    }

    // 释放文件读取锁
    void unlockRead() {
        if (flock(fileDesc, LOCK_UN) == -1) { // 释放锁
            throw std::runtime_error("Failed to unlock file after reading: " + fileName);
        }
        rwLock.unlock_shared(); // 释放共享锁
    }

    // 锁定文件进行写入
    void lockWrite() {
        rwLock.lock(); // 获取独占锁(写锁)
        if (flock(fileDesc, LOCK_EX) == -1) { // 获取独占锁(写锁)
            // rwLock.unlock(); // 释放独占锁
            throw std::runtime_error("Failed to lock file for writing: " + fileName);
        }
    }

    // 释放文件写入锁
    void unlockWrite() {
        if (flock(fileDesc, LOCK_UN) == -1) { // 释放锁
            throw std::runtime_error("Failed to unlock file after writing: " + fileName);
        }
        rwLock.unlock(); // 释放独占锁
    }
};

class FileManager {
private:
    // std::unordered_map<std::string, std::shared_ptr<FileLock>> fileLocks;
    std::unordered_map<std::string, FileLock> fileLocks;
    std::mutex managerMtx; // 用于保护 fileLocks 的互斥锁

    // 私有构造函数,禁止外部创建实例
    FileManager() = default;

    // 删除拷贝构造函数和赋值运算符
    FileManager(const FileManager&) = delete;
    FileManager& operator=(const FileManager&) = delete;

public:
    // 获取单例实例
    static FileManager& getInstance() {
        static FileManager instance; // 线程安全的局部静态变量(C++11 及以上)
        return instance;
    }

    // 获取指定文件的锁
    FileLock& getFileLock(const std::string& fileName) {

    // std::shared_ptr<FileLock> getFileLock(const std::string& fileName) {
        std::lock_guard<std::mutex> guard(managerMtx);
        // // std::cout << "getFileLock:" << fileName << std::endl;
        // if (fileLocks.find(fileName) == fileLocks.end()) {
        //     // 如果该文件锁不存在,创建一个新的锁对象
        //     fileLocks[fileName] = FileLock(fileName); // 永久保存FileLock,减少调用
        //     // fileLocks[fileName] = std::make_shared<FileLock>(fileName);//动态释放FileLock
        // }
        // return fileLocks[fileName];
        auto it = fileLocks.find(fileName);
        if (it == fileLocks.end()) {
            // 如果该文件锁不存在,创建一个新的锁对象并插入到 map 中
            it = fileLocks.emplace(fileName, fileName).first;
        }
        return it->second;
    }

    // 移除指定文件的锁(可选)
    void removeFileLock(const std::string& fileName) {
        std::lock_guard<std::mutex> guard(managerMtx);
        fileLocks.erase(fileName);
    }
};
//创建单例锁管理,永久存储文件锁
FileManager& manager = FileManager::getInstance();

//将双重锁 RAII 化, 用于自动管理 FileLock 的写锁
class WriteLockGuard {
public:
    WriteLockGuard(FileLock& fileLock) : fileLock(fileLock) {
        fileLock.lockWrite(); // 加锁
    }

    ~WriteLockGuard() {
        fileLock.unlockWrite(); // 自动解锁
    }

private:
    FileLock& fileLock;
};

//将双重锁 RAII 化, 用于自动管理 FileLock 的读锁
class ReadLockGuard {
public:
    ReadLockGuard(FileLock& fileLock) : fileLock(fileLock) {
        fileLock.lockRead(); // 加锁
    }

    ~ReadLockGuard() {
        fileLock.unlockRead(); // 自动解锁
    }

private:
    FileLock& fileLock;
};

/*******************************************************************************
 * 名称: checkJsonFile
 * 描述: 创建json文件
 * 作者: mkx
 * 参数: void
 * 返回: void
 ******************************************************************************/
void checkJsonFile(const std::string &fileName) {
    try
    {
        // 检查文件是否存在
        if (fs::exists(fileName)) {
            return;
        }
        // 获取文件所在目录
        fs::path filePath(fileName);
        fs::path directory = filePath.parent_path();

        // std::cout << "filePath:" << filePath << std::endl;
        // std::cout << "directory:" << directory << std::endl;

        // 如果目录不存在,则创建目录
        if (!directory.empty() && !fs::exists(directory)) {
                // std::cout << "创建目录: " << directory << std::endl;
            // debug_log(DLOG_INFO, "Directory created: %s", directory.string().c_str());
            if (!fs::create_directories(directory)) {
                std::cerr << "无法创建目录: " << directory << std::endl;
                // debug_log(DLOG_ERROR, "Directory created fail: %s", directory.string().c_str());
                return;
            }
        }

        FileLock& fileLock = manager.getFileLock(fileName);//以文件名为KEY获取对应锁
        //WriteLockGuard加锁时会自动创建文件
        WriteLockGuard lock(fileLock); // 创建 WriteLockGuard 对象,自动加锁
        if (fs::exists(fileName)) {
            // debug_log(DLOG_INFO, "File created: %s", filePath.string().c_str());
            std::cout << "文件创建成功: " << std::endl;
            return;
        }
    }
    catch (const std::filesystem::filesystem_error &e)
    {
        const std::string &errorMessage = e.what();
        // debug_log(DLOG_ERROR, "Error: %s", errorMessage.c_str());
    }
    catch (const std::exception &e)
    {
        const std::string &errorMessage = e.what();
        // debug_log(DLOG_ERROR, "Error: %s", errorMessage.c_str());
    }
}

/*******************************************************************************
 * 名称: writeToJsonFile
 * 描述: 写json文件
 * 作者: mkx
 * 参数: void
 * 返回: void
 ******************************************************************************/
void writeToJsonFile(const std::string &filename, const json &data, int maxRetries,
                     int retryDelayMilliseconds)
{
    checkJsonFile(filename);// 内有双重锁!
    FileLock& fileLock = manager.getFileLock(filename);//以文件名为KEY获取对应锁
    for (int retryCount = 0; retryCount < maxRetries; ++retryCount)
    {
        try
        {
            WriteLockGuard lock(fileLock); // 创建 WriteLockGuard 对象,自动加锁
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::ofstream outputFile(filename); //覆盖写入
            if (outputFile.is_open()) {
                // 将 JSON 数据写入文件,格式化输出(缩进为4个空格)
                std::cerr  << data.dump(4) << std::endl;
                outputFile << data.dump(4) << std::endl; // 换行
                outputFile.close(); 
                return;
            }
            else
            {
                throw std::ios_base::failure("Failed to open file for writing.");
            }
        }//释放WriteLockGuard
        catch (const std::filesystem::filesystem_error &e)
        {
            const std::string &errorMessage = e.what();
            // debug_log(DLOG_INFO, "File operation failed: %s", errorMessage.c_str());
        }
        catch (const std::ios_base::failure &e)
        {
            const std::string &errorMessage = e.what();
            // debug_log(DLOG_INFO, "File operation failed: %s", errorMessage.c_str());
        }
        catch (const std::exception &e)
        {
            const std::string &errorMessage = e.what();
            // debug_log(DLOG_INFO, "Error: %s", errorMessage.c_str());
        }

        // Introduce a delay before retrying
        std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMilliseconds));
    }
    // debug_log(DLOG_ERROR, "writeToJsonFile failed, Retry 5 times!!!");
}

/*******************************************************************************
 * 名称: readJsonFromFile
 * 描述: 读取json文件
 * 作者: mkx
 * 参数: void
 * 返回: void
 ******************************************************************************/
json readJsonFromFile(const std::string &filename, int maxRetries, int retryDelayMs)
{
    checkJsonFile(filename);
    FileLock& fileLock = manager.getFileLock(filename);//以文件名为KEY获取对应锁
    for (int attempt = 0; attempt < maxRetries; attempt++)
    {
        try
        {
            ReadLockGuard lock(fileLock); // 创建 ReadLockGuard 对象,自动加锁
            // std::this_thread::sleep_for(std::chrono::seconds(1));
            // 打开文件并直接定位到末尾
            std::ifstream inputFile(filename, std::ios::ate | std::ios::binary); 
            if (inputFile.is_open())
            {
                // 检查文件是否为空(避免创建空文件时要写入空json)
                if (inputFile.tellg() == 0) // 获取文件大小
                {
                    std::cout << "R" <<  json().dump(4) << std::endl; // 使用 dump(4) 格式化输出,缩进为 4 个空格
                    return json(); // 返回一个空的 JSON 对象
                }
                inputFile.seekg(0, std::ios::beg); // 重置文件指针到文件开头
                json loadedData;
                inputFile >> loadedData;
                inputFile.close();
                loadedData["WR"] = "R";

                std::cout  << loadedData.dump(4) << std::endl; // 使用 dump(4) 格式化输出,缩进为 4 个空格
                return loadedData;
            }
            else
            {
                // debug_log(DLOG_INFO, "File %s not found. ", filename.c_str());
                std::cout << "else Loaded JSON " << std::endl;
            }
        }//释放ReadLockGuard
        catch (const std::ios_base::failure &e)
        {
            const std::string &errorMessage = e.what();
            // debug_log(DLOG_INFO, "File operation failed: %s", errorMessage.c_str());
        std::cout << "aaaaaaLoaded JSON " << std::endl;
        }
        catch (const std::exception &e)
        {
            const std::string &errorMessage = e.what();
            // debug_log(DLOG_INFO, "Error: %s", errorMessage.c_str());
        std::cout << errorMessage.c_str() << std::endl;
        }
        // 重试之前等待一段时间
        std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs));
    }
    // debug_log(DLOG_ERROR, "readJsonFromFile failed, Retry 5 times!!!");
    // 重试次数超过最大限制,返回空的 JSON 对象表示读取失败
    return json();
}


// 使用fork创建进程并执行任务
void createProcess1(const std::string& proID,const std::string& fileName) {
    pid_t pid = fork();
    
    if (pid == 0) {
        json data = {
            {"name", fileName},
            {"age", proID},
            {"WR", "W"}
        }; 
        // 在子进程中创建线程进行读写操作
        std::thread reader1(readJsonFromFile,fileName,1,100);
        std::thread reader2(readJsonFromFile,fileName,2,100);

        std::thread writer1(writeToJsonFile,fileName,data,3,100);
        std::thread writer2(writeToJsonFile,fileName,data,4,100);

        reader1.join();
        reader2.join();
        writer1.join();
        writer2.join();

        exit(0); // 退出子进程
    } else if (pid > 0) {
        // 父进程
        wait(NULL); // 等待子进程结束
    } else {
        std::cerr << "Fork failed!" << std::endl;
    }
}

int main() {
    std::string fileName1 = "1.txt";
    std::string fileName2 = "2.txt";
    std::string fileName3 = "3.txt";
    std::string fileName4 = "4.txt";
    std::string fileName5 = "5.txt";
    std::string fileName6 = "6.txt";
    std::string fileName7 = "7.txt";
    std::string fileName8 = "8.txt";
    std::string fileName9 = "9.txt";
    std::string fileName0 = "0.txt";
    std::string fileNamea = "a.txt";
    std::string fileNameb = "b.txt";
    std::string fileNamec = "c.txt";
    std::string fileNamed = "d.txt";
    std::string fileNamee = "e.txt";
    std::string fileNamef = "f.txt";
    std::string fileNameg = "g.txt";

        // 创建多个进程进行并行执行
    std::vector<std::thread> threads;
    threads.emplace_back(createProcess1,"P1", fileName1);
    threads.emplace_back(createProcess1,"P2", fileName1);
    threads.emplace_back(createProcess1,"P3", fileName1);
    threads.emplace_back(createProcess1,"P4", fileName1);

    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }
    return 0;
}

原文地址:https://blog.csdn.net/qq_38202733/article/details/145123538

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!