自学内容网 自学内容网

生产者消费者模型

            线程同步

  • 互斥锁(互斥量)
  • 条件变量
  • 生产/消费者模型

一、互斥锁

C++11提供了四种互斥锁:

  • mutex:互斥锁。
  • timed_mutex:带超时机制的互斥锁。
  • recursive_mutex:递归互斥锁。
  • recursive_timed_mutex:带超时机制的递归互斥锁。

包含头文件:#include <mutex>

1、mutex类

1)加锁lock()

互斥锁有锁定和未锁定两种状态。

如果互斥锁是未锁定状态,调用lock()成员函数的线程会得到互斥锁的所有权,并将其上锁。

如果互斥锁是锁定状态,调用lock()成员函数的线程就会阻塞等待,直到互斥锁变成未锁定状态。

2)解锁unlock()

只有持有锁的线程才能解锁。

lock和unlock至少满足95%的应用场景!

3)尝试加锁try_lock()

如果互斥锁是未锁定状态,则加锁成功,函数返回true。

如果互斥锁是锁定状态,则加锁失败,函数立即返回false。(线程不会阻塞等待)

2、timed_mutex类

增加了两个成员函数:

bool try_lock_for(时间长度);

bool try_lock_until(时间点);

3、recursive_mutex类

递归互斥锁允许同一线程多次获得互斥锁,可以解决同一线程多次加锁造成的死锁问题。

4、lock_guard类

lock_guard是模板类,可以简化互斥锁的使用,也更安全。

lock_guard的定义如下:

template<class Mutex>
class lock_guard
{
  explicit lock_guard(Mutex& mtx);
}

lock_guard在构造函数中加锁,在析构函数中解锁。

lock_guard采用了RAII思想(在类构造函数中分配资源,在析构函数中释放资源,保证资源在离开作用域时自动释放)。

二、条件变量-生产消费者模型

条件变量

  • 当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。
  • 为了保护共享资源,条件变量需要和互斥锁结合一起使用
  • 生产/消费者模型(高速缓存队列)

255fcf5dd16f4cce8e2a84a585deb1a7.png

条件变量是一种线程同步机制。当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。

C++11的条件变量提供了两个类:

condition_variable:只支持与普通mutex搭配,效率更高。

condition_variable_any:是一种通用的条件变量,可以与任意mutex搭配(包括用户自定义的锁类型)。

包含头文件:<condition_variable>

1、condition_variable类

主要成员函数:

1)condition_variable() 默认构造函数。

2)condition_variable(const condition_variable &)=delete 禁止拷贝。

3)condition_variable& condition_variable::operator=(const condition_variable &)=delete 禁止赋值。

4)notify_one() 通知一个等待的线程。

5)notify_all() 通知全部等待的线程。

6)wait(unique_lock<mutex> lock) 阻塞当前线程,直到通知到达。

7)wait(unique_lock<mutex> lock,Pred pred) 循环的阻塞当前线程,直到通知到达且谓词满足。

8)wait_for(unique_lock<mutex> lock,时间长度)

9)wait_for(unique_lock<mutex> lock,时间长度,Pred pred)

10)wait_until(unique_lock<mutex> lock,时间点)

11)wait_until(unique_lock<mutex> lock,时间点,Pred pred)

重点(wait(mutex)函数):

wait(mutex)做了三件事:

  1. 把互斥锁解锁。

  2. 阻塞,等待被唤醒。

  3. 被唤醒后,给互斥锁加锁。

2、unique_lock类

template <class Mutex> class unique_lock是模板类,模板参数为互斥锁类型。

unique_lock和lock_guard都是管理锁的辅助类,都是RAII风格(在构造时获得锁,在析构时释放锁)。它们的区别在于:为了配合condition_variable,unique_lock还有lock()和unlock()成员函数。

生产者消费者模型类示例:

#include <iostream>
#include <string>
#include <thread>                      // 线程类头文件。
#include <mutex>                      // 互斥锁类的头文件。
#include <deque>                      // deque容器的头文件。
#include <queue>                      // queue容器的头文件。
#include <condition_variable>  // 条件变量的头文件。
using namespace std;
class AA
{
    mutex m_mutex;                                    // 互斥锁。
    condition_variable m_cond;                  // 条件变量。
    queue<string, deque<string>> m_q;   // 缓存队列,底层容器用deque。
public:
    void incache(int num)     // 生产数据,num指定数据的个数。
    {
        lock_guard<mutex> lock(m_mutex);   // 申请加锁。
        for (int ii=0 ; ii<num ; ii++)
        {
            static int bh = 1;           // 超女编号。
            string message = to_string(bh++) + "号超女";    // 拼接出一个数据。
            m_q.push(message);     // 把生产出来的数据入队。
        }
        //m_cond.notify_one();     // 唤醒一个被当前条件变量阻塞的线程。
        m_cond.notify_all();          // 唤醒全部被当前条件变量阻塞的线程。
    }
    
    void outcache()   {    // 消费者线程任务函数。
        while (true)   {
            // 把互斥锁转换成unique_lock<mutex>,并申请加锁。
            unique_lock<mutex> lock(m_mutex);

            // 条件变量虚假唤醒:消费者线程被唤醒后,缓存队列中没有数据。
            //while (m_q.empty())    // 如果队列空,进入循环,否则直接处理数据。必须用循环,不能用if
            //    m_cond.wait(lock);  // 1)把互斥锁解开;2)阻塞,等待被唤醒;3)给互斥锁加锁。
            m_cond.wait(lock, [this] { return !m_q.empty(); });

            // 数据元素出队。
            string message = m_q.front();  m_q.pop();
            cout << "线程:" << this_thread::get_id() << "," << message << endl;
            lock.unlock();      // 手工解锁。

            // 处理出队的数据(把数据消费掉)。
            this_thread::sleep_for(chrono::milliseconds(1));   // 假设处理数据需要1毫秒。
        }
    }
};

int main()
{
    AA aa;
  
    thread t1(&AA::outcache, &aa);     // 创建消费者线程t1。
    thread t2(&AA::outcache, &aa);     // 创建消费者线程t2。
    thread t3(&AA::outcache, &aa);     // 创建消费者线程t3。

    this_thread::sleep_for(chrono::seconds(2));    // 休眠2秒。
    aa.incache(2);      // 生产2个数据。

    this_thread::sleep_for(chrono::seconds(3));    // 休眠3秒。
    aa.incache(5);      // 生产5个数据。

    t1.join();   // 回收子线程的资源。
    t2.join();
    t3.join(); 
}

流程:

  • 程序运行,因为wait把互斥锁解开了,所以三个消费者都能加锁成功,现在wait到了第二步,三个线程都被阻塞在条件变量的wait()函数中,此时互斥锁没有被任何线程占有;
  • 生产者往队列中放完数据后,会发出条件信号;
  • wait()函数接收到i先弄好之后,不一定立即返回,他还要申请加锁,加锁成功后才会返回;
  • 如果wait()返回了,一定申请到了锁,接下来可以让队列中的数据出队,出对后再解锁。

消费者线程中的while(m_q.empty())循环:

条件变量存在虚假唤醒的情况:消费者线程被唤醒后,缓存队列中没有数据(三个消费者线程,一次生产两个数据,然后notifyall,全部唤醒,肯定有一个线程拿不到数据,被虚假唤醒了)。如果被虚假唤醒了应该继续等待下一次通知,所以用if肯定不行,必须用while。

也可以用wait(unique_lock<mutex> lock,Pred pred) 版本,添加一个谓词:

m_cond.wait(lock, [this] { return !m_q.empty(); });

效果是一样的,本质上这个重载的wait函数中也有个while循环。

 

 

 

 

 

 

 

 

 

 

 

 

 


原文地址:https://blog.csdn.net/weixin_56520780/article/details/143671309

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!