自学内容网 自学内容网

Linux -线程互斥与同步


1.进程线程间互斥相关背景概念

  1. 临界资源:多线程执行流中共享的资源通常叫临界资源。
  2. 临界区:访问临界资源的代码区域叫临界区。
  3. 互斥:只允许一个执行流进入临界区访问临界资源,对临界资源起到了保护作用。
  4. 原子性:一个操作在执行过程中不会被中断,即要么执行完成,要么就是不执行,原子性保证了数据的一致性和完整性。

2.互斥量mutex

2.1使用多线程模拟售票系统

/*这里使用的创建线程等操作是进程封装过的*/


int t = 50;//线程贡献资源

//售票函数
void func()
{
while(true)
{
        if(t > 0)
        std::cout<<"目前是第"<< t--<< "张票"<<std::endl;  
        else
        break; 
}
}

int main()
{
    //创建对象
    Thread t1(func);
    Thread t2(func);
    Thread t3(func);
    Thread t4(func);

    //创建线程
    t1.Start();
    t2.Start();
    t3.Start();
    t4.Start();

    
    //等待线程
    t1.Join();
    t2.Join();
    t3.Join();
    t4.Join();

    return 0;
}

区分临界资源和临界区
![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/9e605a88dbc44493833c36773f342754.png

运行结果
在这里插入图片描述
现象:出现负数,不是设置了 if( t > 0) 了吗?这是因为 if( t > 0) 不是原子性的。
在这里插入图片描述
如何解决?

  1. 线程之间需要存在互斥,当一个线程进入临界区时,就不允许其他线程进入临界区。
  2. 当线程执行完临界区代码后,再允许其他线程中的一个进入临界区。
  3. 完成上述1 、2 操作的叫做互斥量

在这里插入图片描述

2.2互斥量相关接口

2.2.1初始化互斥量
在使用互斥锁时必须进行初始化。
2.2.1.1静态方法

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

2.2.1.2动态分配
函数原型

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);

参数

  1. pthread_mutex_t *mutex:指向需要初始化的互斥锁的指针。pthread_mutex_t 是用于表示互斥锁的类型。
  2. const pthread_mutexattr_t *attr:指向互斥锁属性的指针。如果设置为NULL,则使用默认的互斥锁属性。互斥锁属性允许你定制互斥锁的行为,例如,是否为递归锁、是否为错误检查锁等。

返回值

  1. 成功时,返回 0。
  2. 失败时,返回错误码。

2.2.2销毁互斥锁
2.2.2.1注意

  1. 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁 。
  2. 不要销毁⼀个已经加锁的互斥量 。
  3. 已经销毁的互斥量,要确保后⾯不会有线程再尝试加锁。
  4. 不要多次销毁一个互斥量。

2.2.2.2pthread_mutex_destroy函数
功能

用于销毁互斥锁。

函数原型

int pthread_mutex_destroy(pthread_mutex_t *mutex);

参数

pthread_mutex_t *mutex:指向需要销毁的互斥锁的指针。这个互斥锁必须是之前已经成功初始化过的,并且当前没有被任何线程锁定。

返回值

  1. 成功时,返回 0。
  2. 失败时,返回错误码。

2.2.3pthread_mutex_lock函数
功能

锁定一个互斥锁。

函数原型

int pthread_mutex_lock(pthread_mutex_t *mutex);

参数

pthread_mutex_t *mutex:指向需要锁定的互斥锁的指针。这个互斥锁必须是之前已经成功初始化过的。

返回值

  1. 成功时,返回 0。
  2. 失败时,返回错误码。

注意

  1. 阻塞行为:如果互斥锁已经被另一个线程锁定,调用 pthread_mutex_lock 的线程将被阻塞,直到互斥锁被释放为止。这意味着线程将暂停执行,直到它能够获得互斥锁。
  2. 死锁避免:为了避免死锁,程序员应该确保每个线程在锁定互斥锁后最终都会释放它(使用 pthread_mutex_unlock)。此外,还应该避免嵌套锁定(即一个线程尝试多次锁定同一个互斥锁),除非使用的是递归锁。

2.2.4pthread_mutex_unlock函数
功能

用于释放之前由调用线程锁定的互斥锁。

函数原型

int pthread_mutex_unlock(pthread_mutex_t *mutex);

参数

pthread_mutex_t *mutex:指向需要释放的互斥锁的指针。这个互斥锁必须是之前已经被调用线程成功锁定过的,

返回值

  1. 成功时,返回 0。
  2. 失败时,返回错误码。

注意

  1. 每个对 pthread_mutex_lock 的成功调用都应该有一个对应的 pthread_mutex_unlock 调用。如果忘记释放互斥锁,或者释放了未锁定的互斥锁,都可能导致未定义行为或死锁。
  2. 避免重复释放:不要尝试多次释放同一个互斥锁。一旦互斥锁被释放,其所有权将转移到下一个锁定它的线程(如果有的话)。如果尝试再次释放它,将导致未定义行为。

2.2.5修改售票系统

using namespace ThreadModule;

int t = 10000;

//互斥锁 并初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void func()
{
    
    while(true)
    {
        //sleep(1);
        //加锁
    pthread_mutex_lock(&mutex);
        if(t > 0)
        std::cout<<"目前是第"<< t--<< "张票"<<std::endl;  
        else
        break; 
          //解锁
    pthread_mutex_unlock(&mutex);
    }

  
}
int main()
{
    //创建对象
    Thread t1(func);
    Thread t2(func);
    Thread t3(func);
    Thread t4(func);

    //创建线程
    t1.Start();
    t2.Start();
    t3.Start();
    t4.Start();

    
    //等待线程
    t1.Join();
    t2.Join();
    t3.Join();
    t4.Join();

    return 0;
}

运行结果:不会出现之前的问题了
在这里插入图片描述

3.互斥锁的原理

3.1补充

为了实现互斥锁操作,⼤多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有⼀条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,⼀个处理器上的交换指令执行时另⼀个处理器的交换指令只能等待总线周期。

3.2原理

在这里插入图片描述

4.互斥量的封装

#pragma once
#include <iostream>
#include <pthread.h>

namespace LockModule
{
    class Mutex
    {
        Mutex(const Mutex &) = delete;
        const Mutex &operator=(const Mutex &) = delete;

    public:
        Mutex()
        {
            int n = pthread_mutex_init(&_lock, nullptr); // 初始化锁
            (void)n;                                     // 防止报警告
        }

        // 上锁
        void Lock()
        {
            int n = pthread_mutex_lock(&_lock);
            (void)n;
        }

        // 返回锁指针
        pthread_mutex_t *LockPtr()
        {
            return &_lock;
        }

        // 解锁
        void Unlock()
        {
            int n = pthread_mutex_unlock(&_lock);
            (void)n;
        }

        ~Mutex()
        {
            int n = pthread_mutex_destroy(&_lock);
            (void)n;
        }

    private:
        pthread_mutex_t _lock;
    };
}

5.线程同步

5.1同步概念与竟态条件

  1. 在保证数据安全的前提下(如互斥),线程在访问临界资源时能够按照一定顺序访问,从而避免饥饿问题,叫做同步。
  2. 由于时序问题导致程序异常,叫做竟态条件。

5.2条件变量

5.2.1条件变量的定义

条件变量(Condition Variables)是并发编程中用于线程同步的一种机制,它允许一个或多个线程在某个特定条件发生时进行挂起(阻塞)和唤醒(继续执行),通过条件变量能实现线程同步操作,和避免竟态问题。

5.2.2条件变量的使用
5.2.2.1条件变量

pthread_cond_t name;

5.2.2.2条件变量的初始化
函数原型

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);

参数

  1. cond:指向需要初始化的条件变量的指针。这个条件变量在调用此函数之前应该是未初始化的。
  2. attr:指向条件变量属性的指针。如果设置为NULL,则使用默认属性。条件变量属性允许你指定条件变量的共享性(进程间共享还是线程间共享)等特性,但在大多数用例中,使用默认属性就足够了。

返回值

  1. 如果函数成功执行,则返回 0。
  2. 如果发生错误,则返回错误码。

5.2.2.3条件变量的销毁
函数原型

int pthread_cond_destroy(pthread_cond_t *cond)

参数

cond:指向初始化完的条件变量的指针。

返回值

  1. 如果函数成功执行,则返回 0。
  2. 如果发生错误,则返回错误码。

注意

  1. 无等待线程:在调用 pthread_cond_destroy 之前,必须确保没有任何线程在等待(通过 pthread_cond_wait、pthread_cond_timedwait 等)该条件变量。
  2. 互斥锁:虽然销毁条件变量本身不需要持有任何互斥锁,但通常你会在持有与条件变量相关联的互斥检查锁是否有的情况下线程在等待条件变量(例如,通过维护一个计数器来跟踪等待线程的数量)。然而,pthread_cond_destroy
    本身并不要求你持有这个互斥锁。
  3. 初始化状态:被销毁的条件变量之后不能再次被使用,除非它再次被初始化。在销毁条件变量后,你可以重新初始化它或使用一个新的条件变量。

5.2.2.4条件变量的等待
函数原型

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

参数

  1. cond:指向要使线程等待的条件变量的指针。 mutex:指向与条件变量关联的互斥锁的指针。这个互斥锁必须在调用
  2. pthread_cond_wait 之前由调用线程持有(即处于锁定状态)。

返回值

  1. 如果函数成功执行,则返回 0。
  2. 如果发生错误,则返回错误码。

注意

  1. 互斥锁持有:在调用 pthread_cond_wait 之前,调用线程必须持有与条件变量关联的互斥锁。函数会原子地释放这个互斥锁并使线程进入等待状态。
  2. 重新获得互斥锁:当线程被唤醒时,它会重新获得之前释放的互斥锁(重新申请锁)。这是为了确保线程在检查导致它等待的条件时不会被其他线程干扰。
  3. 避免忙等待:与轮询(忙等待)相比,使用 pthread_cond_wait 可以更有效地利用系统资源,因为线程在等待条件变量时被挂起,不会占用 CPU 时间。
  4. 虚假唤醒:即使没有其他线程对条件变量调用 pthread_cond_signal 或 pthread_cond_broadcast(唤醒),等待线程也可能被唤醒(这称为“虚假唤醒”)。因此,等待线程在重新获得互斥锁后应该总是重新检查导致它等待的条件。

补充:为什么 pthread_cond_wait 需要互斥量?
假设不需要互斥量:那么解锁和加锁就需要我们自己去写了,如

// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);

上述代码设计是错误的,因为解锁+条件变量等待这个过程不是原子性的,当解锁之后可能条件满足,此时信号发出,但是还没进行等待,就错过信号了,最终可能导致该线程一直阻塞,所以解锁+条件变量等待是原子性操作才行。而pthread_cond_wait的操作是原子性的。

补充:等待队列

等待队列:等待队列是由需要等待条件成立的线程组成的队列。当线程调用条件变量的wait函数时,如果条件不满足,该线程会被加入到等待队列中,并释放它当前持有的互斥锁。此时,该线程进入等待状态,不再占用CPU资源。

5.2.2.5条件变量的唤醒
函数原型

int pthread_cond_broadcast(pthread_cond_t *cond);//广播式唤醒线程
int pthread_cond_signal(pthread_cond_t *cond);//选取其中一个

参数
cond:指向要广播的条件变量的指针。

返回值

  1. 如果函数成功执行,则返回 0。
  2. 如果发生错误,则返回错误码。

区别

  1. pthread_cond_signal:通知(或唤醒)至少一个正在等待指定条件变量的线程。重要的是要理解“至少一个”这个表述:如果有多个线程在等待同一个条件变量,那么 pthread_cond_signal 会唤醒这些等待线程中的一个,但具体是哪个线程被唤醒是不确定的,这取决于操作系统的线程调度机制。
  2. pthread_cond_broadcast:唤醒所有等待在指定条件变量上的线程。

5.2.3简单案例
通过上述函数实现一个测试不同唤醒函数的效果。

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>

pthread_cond_t cond;   // 条件变量
pthread_mutex_t mutex; // 锁

int t = 100;

void *func(void *arg)
{
    while (true)
    {
        // 上锁
        pthread_mutex_lock(&mutex);

        // 添加条件变量 -等待+释放锁  唤醒+重新申请锁
        pthread_cond_wait(&cond, &mutex);

        if (t > 0)
        {
            printf("%s : t = %d\n", (char *)arg, t);    //这里需要使用\n保证内容刷新到内核级缓冲区
            t--;
        }
        else
        {
            break;
            pthread_mutex_unlock(&mutex);
        }

        // 释放锁
        pthread_mutex_unlock(&mutex);
    }

    return nullptr;
}

int main()
{
    // 1.初始化锁和条件变量
    pthread_mutex_init(&mutex, nullptr);
    pthread_cond_init(&cond, nullptr);

    // 2.创建线程
    pthread_t pt1, pt2;
    pthread_create(&pt1, nullptr, func, (void *)"线程-1");
    pthread_create(&pt2, nullptr, func, (void *)"线程-2");

    // 3.主线程进程唤醒 
    sleep(3);
    while (true)
    {
        //pthread_cond_broadcast(&cond); // 广播式唤醒线程
        pthread_cond_signal(&cond);    // 选取其中一个
        sleep(1);
    }

    // 4.主线程进程等待
    pthread_join(pt1, nullptr);
    pthread_join(pt2, nullptr);

    //5.销毁条件变量和锁
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);

    return 0;
}

pthread_cond_signal唤醒
在这里插入图片描述
pthread_cond_broadcast唤醒
在这里插入图片描述
现象:使用pthread_cond_signal是按照一定顺序进行访问的,使用pthread_cond_broadcas一次性将所有线程唤醒,唤醒后哪个线程会访问临界资源是不确定的。

5.3条件变量的封装

使用面向对象的方式对条件进行封装

#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"

namespace CondModule
{
    using namespace LockModule;

    class Cond
    {
    public:
        Cond()
        {
            int n = pthread_cond_init(&_cond, nullptr); // 初始化条件变量
            (void)n;
        }

        // 等待
        void Wait(Mutex &mutex) // 让我们的线程释放曾经持有的锁!Mutex -- 封装的锁
        {
            int n = pthread_cond_wait(&_cond, mutex.LockPtr()); // 进行等待
            (void)n;
        }

        // 唤醒至少一个线程
        void Notify()
        {
            int n = pthread_cond_signal(&_cond);
            (void)n;
        }

        // 唤醒全部线程
        void NotifyAll()
        {
            int n = pthread_cond_broadcast(&_cond);
            (void)n;
        }

        ~Cond()
        {
            int n = pthread_cond_destroy(&_cond);
            (void)n;
        }

    private:
        pthread_cond_t _cond; // 条件变量
    };
}

6.生产者消费者模型

6.1简介

  1. 生产者消费者模型是一种用于描述多线程编程中协作关系的经典模型。该模型基于生产者和消费者的角色,通过共享缓冲区来实现数据传递。
  2. 生产者:负责生产数据,并将数据放入共享缓冲区中。生产者线程可以不断地生成新的数据项,并将其添加到缓冲区中,供消费者线程使用。
  3. 消费者:负责消费数据,即从共享缓冲区中取出数据进行处理。消费者线程可以从缓冲区中读取数据项,并进行相应的处理或计算。
  4. 共享缓冲区:用于存储生产者生产的数据,供消费者消费。缓冲区的大小是有限的,因此生产者需要在缓冲区未满时生产数据,而消费者需要在缓冲区非空时消费数据。
  5. 特点
  • 解耦
  • ⽀持并发
  • ⽀持忙闲不均

6.2通过超市的例子加深理解

在这里插入图片描述

6.3基于BlockingQueue的⽣产者消费者模型

6.3.1简介
在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
在这里插入图片描述

6.3.2通过c++队列实现生产者消费者模型

  1. 设置一个锁使 生产者和生产者、消费者和消费者、生产者和消费者实现互斥。
  2. 设置两个条件变量来使生产者和消费者进行同步。
    BlockingQueue.hpp
#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"

namespace BlockQueueModule
{
    using namespace LockModule;
    using namespace CondModule;

    static const int gcap = 10; //最大容量

    template <typename T>
    class BlockQueue
    {
    private:
        bool IsFull()  //是否满
        {
            return bool(_q.size() == _cap);
        }
        bool IsEmpty()  //是否空
        {
            return _q.empty();
        }

    public:
        BlockQueue(int cap = gcap)
        :_cap(cap),
        _cwait_num(0),
        _pwait_num(0)

        {
        }

        void Equeue(const T &in) // 生产者
        {
            LockGuard lock(_mutex); //上锁

            while(IsFull()) //判断是否满了,使用while是防止伪唤醒
            {
                //进行等待
                _pwait_num++;
                _productor_cond.Wait(_mutex);   //可能会有伪唤醒
                _pwait_num--;
            }

            //到这里肯定不为满了 - 生产数据
            _q.push(in);

            //是否有需要唤醒的消费者线程
            if(_cwait_num > 0)
            {
                _consumer_cond.NotifyAll(); //进行全部唤醒
            }
        }

        void Pop(T *out) // 消费者
        {
            LockGuard lock(_mutex); //上锁
            while(IsEmpty())
            {
                _cwait_num++;
                _consumer_cond.Wait(_mutex);
                _cwait_num--;
            }

            //到这里肯定不为空 - 消费数据
            *out = _q.front();
            _q.pop();

            //是否有需要唤醒的生产者线程
            if(_pwait_num > 0)
            {
                _productor_cond.NotifyAll();
            }

        }

        ~BlockQueue()
        {
        }

    private:
        std::queue<T> _q;     // 保存数据的容器,临界资源
        int _cap;             // bq最大容量
        Mutex _mutex;         // 互斥
        Cond _productor_cond; // 生产者条件变量
        Cond _consumer_cond;  // 消费者条件变量

        int _cwait_num;       //正在等待的消费者线程数量
        int _pwait_num;       //正在等待的生产者线程数量
    };
}

Main.cc

#include "BlockingQueue.hpp"
#include <pthread.h>
#include <unistd.h>

using namespace BlockQueueModule;

void *Consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        int data;
        // 1. 从bq拿到数据
        bq->Pop(&data);

        // 2.做处理
        printf("Consumer, 消费了一个数据: %d\n", data);
    }
}

void *Productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    int data = 10;
    while (true)
    {
        sleep(2);
        bq->Equeue(data);
        printf("producter 生产了一个数据: %d\n", data);
        data++;
    }
}

int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源

    //创建线程
    pthread_t c1, p1, c2, p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq);
    pthread_create(&c2, nullptr, Consumer, bq);
    pthread_create(&p1, nullptr, Productor, bq);
    pthread_create(&p2, nullptr, Productor, bq);
    pthread_create(&p3, nullptr, Productor, bq);

    //等待线程
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);

    
    delete bq;
    return 0;
}

代码总结:

  1. 有了互斥和同步,上述代码既可以实现单生产单消费也可以实现多生产多消费。
  2. 在判断满或空时应注意伪唤醒情况。
  3. 生产者生产的可以是数据也可以是任务等。

7.POSIX信号量简单使用

7.1概念

POSIX信号量是一种用于进程间或线程间同步的机制,它通常由一个整数值表示,并可以进行原子增减操作。这个整数值代表了可用资源的数量,当值为正时,表示有资源可用;当值为0时,表示资源已被占用。

7.2本质

信号量的本质是一个计数器。

在这里插入图片描述

7.3POSIX信号量与其相关函数

7.3.1POSIX信号量和头文件

#include <semaphore.h>
sem_t sem_name;

7.3.2初始化POSIX信号量
函数原型

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数

  1. sem:指向信号量结构的指针。这个指针指向将要被初始化的信号量对象。
  2. pshared:控制信号量的共享属性。
    如果 pshared 的值为 0,则信号量被当前进程的所有线程共享。此时,信号量应放置在当前进程的所有线程都可见的地址上,如全局变量或堆上动态分配的变量。
    如果pshared 的值大于 0,则信号量在进程间共享。这通常意味着信号量被放置在共享内存区域中,以便多个进程可以访问它。
  3. value:信号量的初始值。这个值表示可用资源的数量。当信号量的值大于 0 时,表示有资源可用;当信号量的值等于 0 时,表示资源已被占用。

返回值

  1. 成功时,sem_init 返回 0。
  2. 失败时,sem_init 返回 -1,并设置 errno 以指示错误类型。

注意

  1. 在调用 sem_init 之前,应确保信号量指针 sem 指向的内存区域是有效的,并且没有被其他线程或进程使用。
  2. 信号量的初始值 value 应根据实际应用场景进行合理设置。初始值太大可能导致资源竞争不充分,初始值太小则可能导致线程频繁阻塞和唤醒。

7.3.3销毁POSIX信号量
函数原型

int sem_destroy(sem_t *sem);

参数

sem:指向要销毁的信号量结构的指针。这个指针必须指向一个由 sem_init 成功初始化的信号量。

返回值

  1. 成功时,sem_init 返回 0。
  2. 失败时,sem_init 返回 -1,并设置 errno 以指示错误类型。

注意

  1. 有效性:在调用 sem_destroy 之前,应确保信号量 sem是有效的,并且是由当前进程初始化的。如果信号量是由其他进程初始化的(例如,在共享内存中),则当前进程不应尝试销毁它。
  2. 未决操作:在调用 sem_destroy 之前,应确保没有线程在等待该信号量(即没有未完成的 sem_wait 或 sem_trywait
    操作)。否则,行为是未定义的,可能会导致程序崩溃或死锁。
  3. 多次销毁:不应多次销毁同一个信号量。一旦信号量被销毁,其指针应被视为无效,并且不应再次使用。
  4. 进程终止:在进程终止时,系统会自动销毁该进程所拥有的所有信号量。

7.3.4等待POSIX信号量
函数原型

int sem_wait(sem_t *sem); //P() 信号量会减1

参数

sem:指向要等待的信号量结构的指针。这个指针必须指向一个已经由 sem_init 初始化的信号量。

返回值

  1. 成功时,sem_init 返回 0。
  2. 失败时,sem_init 返回 -1,并设置 errno 以指示错误类型。

功能

用于对信号量进行 P(等待/减少)操作。当线程调用 sem_wait 时,它会尝试减少信号量的值。如果信号量的值大于 0,则 sem_wait 会成功地将信号量的值减 1,并立即返回。如果信号量的值为 0,则 sem_wait 会阻塞调用线程,直到信号量的值变为大于 0(即有其他线程调用了 sem_post 来增加信号量的值)。

注意

阻塞行为:如果信号量的值为 0,则 sem_wait 会阻塞调用线程。这意味着线程将暂停执行,直到信号量的值变为大于 0。因此,在使用
sem_wait 时应考虑到线程可能被阻塞的情况,并采取相应的措施来避免死锁或优先级反转等问题。

7.3.5发布POSIX信号量
函数原型

int sem_post(sem_t *sem);//信号量+1

参数

sem:指向要增加的信号量结构的指针。这个指针必须指向一个已经由 sem_init 初始化的信号量。

返回值

  1. 成功时,sem_post 返回 0。
  2. 失败时,sem_post 返回 -1,并设置 errno 以指示错误类型。

功能

sem_post 函数的主要作用是增加信号量的值。如果信号量的值在增加之前是 0,并且有其他线程在等待该信号量(即调用了 sem_wait 并被阻塞),则这些线程中的一个(由调度程序决定)将被唤醒并继续执行。如果信号量的值在增加之前已经大于 0,则没有线程会被唤醒,但信号量的值会增加,以备将来使用。

7.4封装POSIX信号量

使用面向对象的方式进行封装

#pragma once
#include <semaphore.h>

namespace SemModule
{
    int defalutsemval = 1;//默认资源数量

    class Sem
    {
    public:
        Sem(int value = defalutsemval)
        :_init_value(value)
        {
            int n = sem_init(&_sem,0,_init_value);
            (void)n;
        }

        //等待
        void P()
        {
            int n = sem_wait(&_sem);
            (void)n;
        }

        //发布
        void V()
        {
            int n = sem_post(&_sem);
            (void)n;
        }

        ~Sem()
        {
            int n = sem_destroy(&_sem);
            (void)n;
        }
    private:
        sem_t _sem; //信号量
        int _init_value;    //资源数量
    };
}

8.基于环形队列的生产消费模型

8.1储存结构

环形队列采用数组进行模拟,通过模运算定位下标。

![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6d28b6419bc14f2db3f853d847d6915c.png

8.2实现

  1. 设置两信号量,一个代表空间的数量,一个代表数据的数量。
  2. 环形队列的判空和判满可以通过信号量来代替,如空间信号量为0时代表满,数据信号量为0时代表为空,这样就不会出现二义性了。
  3. 设置消费者和生产者当前所访问的下标,因为有信号量的约束,它们的下标不会相同(形成了消费者和生产者的互斥和同步)。
  4. 设置两个锁,分别实现生产者和生产者的互斥、消费者和消费者的互斥。

RingBuffer.hpp

#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>

#include "Sem.hpp"
#include "Mutex.hpp"

namespace RingBufferModule
{
    using namespace SemModule;
    using namespace LockModule;

    // RingQueue
    template <typename T>
    class RingBuffer
    {
    public:
        RingBuffer(int cap)
        :_ring(cap),
        _cap(cap),
        _p_step(0),
        _c_step(0),
        _datasem(0),
        _spacesem(cap)
        {}

        //生产
        void Equeue(const T &in)
        {
            _spacesem.P();  //如果空间为0进行阻塞,大于0进行减一并执行下面操作

            //进行生产
            {   
            LockGuard _lock(_p_lock); //加锁
            _ring[_p_step++] = in;
            _p_step %= _cap;
            }

            _datasem.V();   //数据数目加一
        }

        void Pop(T *out)
        {
            _datasem.P();    //如果空间为0进行阻塞,大于0进行减一并执行下面操作

            //进行消费            
            {
            LockGuard _lock(_c_lock);   //加锁
            *out = _ring[_c_step++];
            _c_step %= _cap;
            }

            _spacesem.V(); //空间数目加一

        }


        ~RingBuffer()
        {}

    private:
        std::vector<T> _ring; // 环, 临界资源
        int _cap;             // 总容量
        int _p_step;          // 生产者位置
        int _c_step;          // 消费位置

        Sem _datasem;         // 数据信号量
        Sem _spacesem;        // 空间信号量

        Mutex _p_lock;         //生产者锁
        Mutex _c_lock;          //消费者锁
    };
}

Main.cc

#include "RingBuffer.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

using namespace RingBufferModule;

void *Consumer(void *args)
{
    RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
    while(true)
    {
        sleep(1);
        int data;
        ring_buffer->Pop(&data);

        std::cout << "消费了一个数据: " << data << std::endl;
    }
}

void *Productor(void *args)
{
    RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
    int data = 0;
    while (true)
    {
        sleep(1);
        ring_buffer->Equeue(data);
        std::cout << "生产了一个数据: " << data << std::endl;
        data++;
    }
}

int main()
{
    RingBuffer<int> *ring_buffer = new RingBuffer<int>(5); // 共享资源 -> 临界资源

    pthread_t c1, p1, c2,c3,p2;
    pthread_create(&c1, nullptr, Consumer, ring_buffer);
    pthread_create(&c2, nullptr, Consumer, ring_buffer);
    pthread_create(&c3, nullptr, Consumer, ring_buffer);
    pthread_create(&p1, nullptr, Productor, ring_buffer);
    pthread_create(&p2, nullptr, Productor, ring_buffer);


    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);


    delete ring_buffer;

    return 0;
}

代码总结

  1. 有了互斥和同步,上述代码既可以实现单生产单消费也可以实现多生产多消费。
  2. 生产者生产的可以是数据也可以是任务等。

原文地址:https://blog.csdn.net/2302_79539362/article/details/143979271

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!