自学内容网 自学内容网

POSIX信号量

7. POSIX信号量

7.1 相关概念

7.1.1 理解信号量

信号量/信号灯的本质是一把计数器,类似 int cnt = n
用来描述临界资源中资源数量的多少!

  1. 申请计数器成功,就表示我具有访问资源的权限了
  2. 申请了计数器资源,我当前并没有访问我想要的资源。申请了计数器资源是对资源的预订机制
  3. 计数器可以有效保证进入共享资源的执行流的数量
  4. 所以每一个执行流,想访问共享资源中的一部分的时候,不是直接访问,而是先申请计数器资源。
    类似看电影的先买票!

7.1.2 二元信号量

我们把临界资源位1,信号量值只能为1,0两态的计数器叫做 二元信号量,本质就是一个锁

将计数器设置为1,资源为1的本质:其实就是将临界资源不要分成很多块了,而是当做一个整体。整体申请,整体释放

7.1.3 PV操作

申请信号量,本质是对计数器进行–操作,也就是P操作
释放资源,释放信号量,本质是对计数器进行++操作,也就是V操作

为了保证–操作和++操作不会被其他进程打扰,我们让该操作变成原子操作
即:要么不做,要做就做完,两态的,没有“正在做”这样的概念!

7.1.4 信号量凭什么是进程间通信的一种

  1. 通信不仅仅是通信数据,互相协同也是
  2. 要协同,本质也是通信,信号量首先要被所有的通信进程看到

7.1.5 总结

  1. 信号量本质是一把计数器,其PV操作是原子的
  2. 执行流想访问资源,必须先申请信号量资源,得到信号量之后,才能访问临界资源
  3. 信号量值1, 0两态的,二元信号量,就是互斥功能
  4. 申请信号量的本质: 是对临界资源的预订机制

7.2 信号量接口

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
    pshared:0表示线程间共享,非零表示进程间共享
    value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量 P()

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量 V()

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1int sem_post(sem_t *sem);//V()

7.3 基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性

image-20241004154411262

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态

生产者和消费者访问该环形队列需要满足一下以下三个原则

  1. 指向同一个位置的时候,只能一方来访问
    生产者访问
    为满:消费者访问
  2. 消费者不能超过生产者
  3. 生产者不能 套消费者一个圈

生产者关注的资源:还有多少剩余空间 SpaceSem = N
消费者关注的数据:还有多少剩余数据 DataSem = 0

/* 生产者 */
P(SpaceSem)
// 生产
V(DataSem)
    
/* 消费者 */
P(DataSem)
// 生产
V(SpaceSem)

7.4 单生产者单消费者

// RingQueue.hpp
template<class T>
class RingQueue
{
    static const size_t defaultCap = 5;
private:
    void P(sem_t& x)
    {
        sem_wait(&x);
    }
    
    void V(sem_t& x)
    {
        sem_post(&x);
    }
public:
    RingQueue(size_t capacity = defaultCap) 
    : _ringQueue(capacity)
    , _capacity(capacity)
    , _cPos(0)
    , _pPos(0)
    {
        sem_init(&_cDataSem, 0, 0);
        sem_init(&_pSpaceSem, 0, capacity);
    }

    ~RingQueue()
    {
        sem_destroy(&_cDataSem);
        sem_destroy(&_pSpaceSem);
    }

    void Push(const T& x) 
    {        
        P(_pSpaceSem);
        _ringQueue[_pPos] = x;
        V(_cDataSem);
        _pPos++;    
        _pPos %= _capacity;     // 维持队列的环形特征
    }

    void Pop(T* x)
    {
        P(_cDataSem);
        *x = _ringQueue[_cPos];
        V(_pSpaceSem);

        _cPos++;
        _cPos %= _capacity;     // 维持队列的环形特征
    }

private:
    vector<T> _ringQueue;    // 用数组模拟环形队列
    size_t _capacity;        // 队列大小
    size_t _cPos;            // 消费者下标
    size_t _pPos;            // 生产者下标
    sem_t _cDataSem;         // 消费者关心还有多少剩余数据
    sem_t _pSpaceSem;        // 生产者关心还有多少剩余空间
};
// main.cc
void* Producer(void* args)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
    while(true) {
        // 获取数据
        int data = rand() % 10;
        // 生产数据
        rq->Push(data);
        printf("Producer put data: %d\n", data);
        sleep(1);
    }
    return nullptr;
}

void* Consumer(void* args)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
    while(true) {
        // 消费数据
        int data = 0;
        rq->Pop(&data);
        printf("Consumer get data: %d\n", data);
        // 处理数据
        // sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr));
    RingQueue<int>* rq = new RingQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, rq);
    pthread_create(&p, nullptr, Producer, rq);
    pthread_join(c, nullptr);     
    pthread_join(p, nullptr);     
    cout << "main thread quit" << endl;
    delete rq;
    return 0;
}

image-20241007104110977

生产者生产一个数据,消费者才能拿到一个数据,生产者和消费者之间有很好的同步性和互斥性
更改sleep()的位置,让消费者消费的慢一点

image-20241007104542832

7.6 多生产者和多消费者

需要满足生产者和生产者之间的互斥和消费者和消费者之间的互斥

任何时刻,只能有一个生产者或一个消费者在环形队列里进行生产或消费(在队列不满不空的情况下,可以有两个线程在同时访问同一个队列)
这里的临界资源是两个下标,所以要有两把锁

// RingQueue.hpp
template<class T>
class RingQueue
{
    static const size_t defaultCap = 5;
private:
    void P(sem_t& x)
    {
        sem_wait(&x);
    }
    
    void V(sem_t& x)
    {
        sem_post(&x);
    }

    void Lock(pthread_mutex_t& mutex)
    {
        pthread_mutex_lock(&mutex);
    }

    void UnLock(pthread_mutex_t& mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(size_t capacity = defaultCap) 
    : _ringQueue(capacity)
    , _capacity(capacity)
    , _cPos(0)
    , _pPos(0)
    {
        // cout << "capacity:" << capacity << endl;
        sem_init(&_cDataSem, 0, 0);
        sem_init(&_pSpaceSem, 0, capacity);
    }

    ~RingQueue()
    {
        sem_destroy(&_cDataSem);
        sem_destroy(&_pSpaceSem);
    }

    void Push(const T& x) 
    {        
        P(_pSpaceSem);

        Lock(_pMutex);          // 加锁和解锁在PV操作之后,提高并发度。只有当信号量允许时,进程才会尝试获取锁。这样可以减少锁的争用,提高系统的整体效率。
        _ringQueue[_pPos] = x;
        _pPos++;    
        _pPos %= _capacity;     // 维持队列的环形特征
        UnLock(_pMutex);

        V(_cDataSem);

    }
        

    void Pop(T* x)
    {
        P(_cDataSem);
        Lock(_cMutex);          // 加锁和解锁在PV操作之后,提高并发度。只有当信号量允许时,进程才会尝试获取锁。这样可以减少锁的争用,提高系统的整体效率。
        *x = _ringQueue[_cPos];
        _cPos++;
        _cPos %= _capacity;     // 维持队列的环形特征
        UnLock(_cMutex);
        V(_pSpaceSem);
    }

private:
    vector<T> _ringQueue;    // 用数组模拟环形队列
    size_t _capacity;        // 队列大小
    size_t _cPos;            // 消费者下标
    size_t _pPos;            // 生产者下标
    sem_t _cDataSem;         // 消费者关心还有多少剩余数据
    sem_t _pSpaceSem;        // 生产者关心还有多少剩余空间
    pthread_mutex_t _cMutex; // 消费者的锁
    pthread_mutex_t _pMutex; // 生产者的锁
};
static const size_t C_NUM = 5;
static const size_t P_NUM = 5;

struct ThreadData
{
    RingQueue<int>* rq;
    string name;
};

void* Producer(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    while(true) {
        // 获取数据
        int data = rand() % 10;
        const char* name = td->name.c_str();
        RingQueue<int>* rq = td->rq;
        // 生产数据
        rq->Push(data);
        printf("%s, put data: %d\n", name, data);
        // sleep(1);
    }
    delete td;
    return nullptr;
}

void* Consumer(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    while(true) {
        // 消费数据
        int data = 0;
        const char* name = td->name.c_str();
        RingQueue<int>* rq = td->rq;
        rq->Pop(&data);
        printf("%s, put data: %d\n", name, data);
        // 处理数据
        sleep(1);
    }
    delete td;
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr));
    RingQueue<int>* rq = new RingQueue<int>();
    pthread_t c[C_NUM], p[P_NUM];
    for(size_t i = 0; i < P_NUM;++i) {
        ThreadData* pTd = new ThreadData();
        pTd->rq = rq;
        pTd->name = "Producer Thread-" + to_string(i+1);
        pthread_create(c+i, nullptr, Producer, pTd);
    }
    for(size_t i = 0; i < C_NUM;++i) {
        ThreadData* cTd = new ThreadData();
        cTd->rq = rq;
        cTd->name = "Consumer Thread-" + to_string(i+1);
        pthread_create(p+i, nullptr, Consumer, cTd);
    }
    for(size_t i = 0; i < P_NUM;++i)
        pthread_join(c[i], nullptr);
    for(size_t i = 0; i < C_NUM;++i)
        pthread_join(p[i], nullptr);
    delete rq;
    return 0;
}

image-20241007135107313

8. 线程池

概念:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误

线程池的种类:

  1. 创建固定数量线程池,循环从任务队列中获取任务对象
  2. 获取到任务对象后,执行任务对象中的任务接口
// ThreadPool.cc
struct ThreadData
{
    pthread_t tid;
    string name;
};

// T表示任务的类型
template<class T>
class ThreadPool
{
    static const size_t defaultNum = 5;
private:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnLock()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void ThreadWake()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    bool IsQueueEmpty()
    {
        return _tasks.empty();
    }

    string GetName(const pthread_t& t)
    {
        for(const auto& e : _threads) {
            if(e.tid == t) 
                return e.name;
        }
        return "";
    }
public:
    ThreadPool(size_t num = defaultNum) : _threads(num) 
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    // 线程执行任务,不需要this指针,所以设置成static
    static void* StartRoutine(void* args)
    {   
        ThreadPool* tp = static_cast<ThreadPool*>(args);
        const char* name = tp->GetName(pthread_self()).c_str();
        while(true) {
            tp->Lock();
            // while循环判断防止误唤醒
            while(tp->IsQueueEmpty()) {
                tp->ThreadSleep();
            }
            T t = tp->Pop();
            tp->UnLock();
            t.Run();
            printf("%s, res: %s\n", name, t.GetResult().c_str());
        }
        return nullptr;
    }

    // 创建进程
    void Start()
    {
        int num = _threads.size();
        for (size_t i = 0; i < num; i++) {
            _threads[i].name = "Thread-" + to_string(i+1);
            pthread_create(&(_threads[i].tid), nullptr, StartRoutine, this);
        }
    }

    T Pop()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }

    // 添加任务
    void Push(const T& t)
    {
        Lock();
        _tasks.push(t);
        ThreadWake();
        UnLock();
    }
private:
    vector<ThreadData> _threads;
    queue<T> _tasks;    // 任务,这是临界资源
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};
// main.cc
#include "Task.hpp"
#include "ThreadPool.cc"
#include <ctime>

int main()
{
    srand((unsigned int)time(nullptr));
    ThreadPool<Task>* tp = new ThreadPool<Task>();
    tp->Start();
    while(true) {
        int x = rand() % 50;
        int y = rand() % 10;
        char op = opStr[rand() % 4];
        Task t(x, y, op);
        tp->Push(t);
        printf("Main thread make task, %s\n", t.GetTask().c_str());
        sleep(1);
    }
    return 0;
}

image-20241007155652880

9. 简单封装C++的线程

// 返回值是void, 参数是T的包装器
template<class T>
using fun = function<void(T)>;

static int NUM = 1;
template<class T>
class Thread
{
public:
    Thread(fun<T> f, T data = 0) : _f(f), _data(data)
    {}

    // 让线程执行StartRoutine函数
    void Run() 
    {
        _name = "Thread-" + to_string(NUM++);
        _isRunning = true;
        pthread_create(&_tid, nullptr, StartRoutine, this);
    }

    static void* StartRoutine(void* args) 
    {
        Thread* td = static_cast<Thread*>(args);
        td->Entry();
        return nullptr;
    }

    void Join()
    {
        pthread_join(_tid, nullptr);
        _isRunning = false;
    }

    string GetName()
    {
        return _name;
    }

    bool IsRunning() 
    {
        return _isRunning;
    }

    // 回调fun
    void Entry()
    {
        _f(_data);
    }
private:
    pthread_t _tid = 0;
    string _name = "";
    bool _isRunning = false;
    T _data;     // 函数的参数
    fun<T> _f;      // 让线程执行某一个任务
};
// main.cc
#include "Thread.cc"
#include <vector>

void Run(void* args)
{
    while(true) {
        printf("New thread\n");
        // cout << a << endl;
        sleep(1);
    }
}

int main()
{
    // Thread<int> t(Run, 123);
    // t.Run();
    // t.Join();
    vector<Thread<void*>> v;
    for (size_t i = 0; i < 10; i++) {
        v.push_back(Thread<void*>(Run));
    }
    
    for(auto& e : v) {
        e.Run();
    }

    for(auto& e : v) {
        e.Join();
    }
    return 0;
}

image-20241007171143171


让线程池使用咱自己写的C++线程类

struct ThreadData
{
    pthread_t tid;
    string name;
};

// T表示任务的类型
template<class T>
class ThreadPool
{
    static const size_t defaultNum = 5;
private:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnLock()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void ThreadWake()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    bool IsQueueEmpty()
    {
        return _tasks.empty();
    }

    string GetName(pthread_t& t)
    {
        for(auto& e : _threads) {
            if(e.GetTid() == t) 
                return e.GetName();
        }
        return "";
    }
public:
    ThreadPool(size_t num = defaultNum)
    {
        for(size_t i = 0; i < num;++i) {    
            _threads.push_back(Thread<void*>(StartRoutine, this));
        }
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    // 线程执行任务,不需要this指针,所以设置成static
    static void StartRoutine(void* args)
    {   
        ThreadPool* tp = static_cast<ThreadPool*>(args);
        pthread_t self = pthread_self();
        string name = tp->GetName(self);
        while(true) {
            tp->Lock();
            // while循环判断防止误唤醒
            while(tp->IsQueueEmpty()) {
                tp->ThreadSleep();
            }
            T t = tp->Pop();
            tp->UnLock();
            t.Run();
            printf("%s, res: %s\n", name.c_str(), t.GetResult().c_str());
        }
    }

    // 创建进程
    void Start()
    {
        int num = _threads.size();
        for (size_t i = 0; i < num; i++) {
            _threads[i].Run();
        }
    }

    T Pop()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }

    // 添加任务
    void Push(const T& t)
    {
        Lock();
        _tasks.push(t);
        ThreadWake();
        UnLock();
    }
private:
    vector<Thread<void*>> _threads;
    queue<T> _tasks;    // 任务,这是临界资源
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

image-20241007175644413


原文地址:https://blog.csdn.net/Suinnn/article/details/142743970

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!