自学内容网 自学内容网

Linux——线程同步与生产者消费者模型

目录

前言

一、线程同步

二、生产者消费者模型

三、条件变量

1.理解条件变量

2.条件变量接口

2.1 条件变量初始化与销毁

2.2 条件变量等待

 2.3 条件变量唤醒等待

2.4 条件变量接口运用

2.5 条件变量进行抢票 

3.条件变量的细节 

四、基于BlockingQueue的生产者消费者模型

1.单线程版 

2.使用LockGuard 

3.多线程版 


前言

之前我们学习了线程互斥,知道了系统为保护临界资源,需要让线程互斥,一个个的访问资源,同时我们也发现线程饥饿的问题,就是某一个线程一直持有锁,释放锁,再持有锁,释放锁,其他线程根本就竞争不过他,因此又提出了线程同步,让线程有序的进行申请锁资源

一、线程同步

我们讲个小故事

  1. 之前,我们想象中的同步就是夫妻两人你看你的斗破苍穹,我玩我的英雄联盟。我们两个一起进行且互不干扰。
  2. 但是今天,我们都要向存钱罐里拿钱,你为了去买更好的手机看小说,我为了去买更好的电脑打游戏。
  3. 我们也不能一起将手往存钱罐里面摸,因为存钱罐口就那么大,因此就得互斥的去拿钱,也就是我拿的时候你不能拿,你拿的时候我不能拿。
  4. 但是你也不能太霸道的,比如说你一直去拿钱,刚拿完钱出来,又将手伸过去拿,导致钱都被你拿光了,我一直无法从里面拿钱,那我就饥饿了。
  5. 这样长此以往肯定不行,那么现在规则修改了,从存钱灌里面拿钱之后,不能再立刻申请,而应该在后面排队,这样每个人都可以拿到钱干自己想干的事情。

在这个故事中

  • 夫妻二人就是两个线程
  • 钱是临界资源
  • 存钱罐口就是一把锁
  • 同一时间只能一个人拿钱叫做线程互斥
  • 按顺序拿钱就是线程同步

线程同步是在临界资源使用安全的前提下,让多线程执行具有一定的顺序性

互斥能保证资源的安全,同步能较为充分高效的使用资源

二、生产者消费者模型

在我们生活中,超市是典型的生产者消费者模型。生产者生成商品放入超市,消费者进入超市消费商品。

在计算机生产者消费者模型中,生产者是线程,消费者也是线程。生产线程生成数据,消费线程消费(处理)数据。而超市本质上就是内存空间

在这个过程中,我们必须保证生产消费过程是安全的! 

  • 不可能说你想在超市的最好的地方放东西,我也想在这里放东西,这样就会发生数据覆盖的现象,因此生产者之间是需要互斥的
  • 而超市的资源(数据)是很珍贵的,如果两人一起访问并做处理,可能会发生数据紊乱或者浪费消费者资源的事情。因此消费者之间也是互斥的
  • 生产者和消费者之间,必须也是互斥的,因为你放入完毕之后我才能来拿,如果你刚放入一点,比如想放入“hello world”,只放入了“hello”我就来拿走,也会发生问题。
  • 同时生产者和消费者之间,还得是同步的。如果仅仅是互斥,那么可以连续一个月生产者都在往里面放资源,消费者没机会来消费,那就效率太低下了。同步之后,生产者发现我生产的东西比较多了,就让消费者来拿,消费者看见超市里面没什么东西了,就让生产者生产。这样就可以保证生产者消费者模型更好的运转。

小总结

生产者之间互斥

消费者之间互斥

生产者与消费者之间互斥+同步 

三、条件变量

1.理解条件变量

我们说了这么多的同步与生产者消费者模型,那如何实现线程同步呢?pthread库给我们提供了条件变量。

我们讲个小故事来理解条件变量

  • 有一个拿苹果游戏,一个人往箱子里放苹果,另一个人蒙着眼睛从箱子里拿苹果,箱子只有一个口,就是要么只允许放,要么只允许拿。
  • 拿苹果的人需要尽快拿到苹果,但又不清楚放苹果的人什么时候会放入苹果,于是他疯狂的从箱子里面拿,刚摸了一下发现没苹果,过了几秒不放心,感觉可能放苹果的人已经放入了苹果,于是又想去拿苹果。
  • 就这样,拿苹果的人一直在申请拿苹果,人家放苹果的人就算到来了也来不及放,因为你一直拿,占用着锁。
  • 于是乎,现在你们想了一个办法,就是你先不要来拿,你等我发消息,当我放入的时候会给你发送消息,你现在再来拿苹果。
  • 这个消息就是条件变量!!!!

如果拿苹果的人很多,他们也需要排队,那么条件变量中肯定要维护一个线程队列,同时还要有个标志位判断条件是否就绪。

2.条件变量接口

2.1 条件变量初始化与销毁

如下是条件变量的初始化和销毁接口,特别类似于互斥锁。

定义全局:PTHREAD_COND_INITAILIZER  直接初始化即可

定义局部:pthread_cond_init()

  • 参数cond:pthread_cond_t cond创建条件变量,将&cond传入即可。
  • 参数attr:设置条件变量属性,默认传NULL;

销毁:pthread_cond_destroy()

  • 参数:&cond

2.2 条件变量等待

条件变量等待接口如下,(等待的是资源就绪)

pthread_cond_wait()

  • 参数cond:需要等待的条件变量
  • 参数mutex:需要的锁
  • 返回值:成功返回0,失败返回错误原因。 

 2.3 条件变量唤醒等待

条件变量唤醒等待接口如下

pthread_cond_signal()作用是使指定的条件变量成立,并且唤醒一个线程

pthread_cond_broadcast()作用是使指定的条件变量成立,并且唤醒所有线程

他们两个的参数都是 &cond

2.4 条件变量接口运用

如下是互斥代码,通过打印观察运行情况

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>

using namespace std;

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *threadRoutine(void *args)
{
    string name = static_cast<const char*> (args);
    while (true)
    {
        sleep(1);
        pthread_mutex_lock(&mutex);
        cout<<"I am a new thread : "<<name<<endl;
        pthread_mutex_unlock(&mutex); 
    }
    
}

int main()
{
    
    pthread_t t1,t2,t3;
    pthread_create(&t1,nullptr,threadRoutine,(void*)"thread_1");
    pthread_create(&t2,nullptr,threadRoutine,(void*)"thread_2");
    pthread_create(&t3,nullptr,threadRoutine,(void*)"thread_3");

    pthread_join(t1,nullptr);
    pthread_join(t2,nullptr);
    pthread_join(t3,nullptr);
}

运行发现现在仅仅是互斥,并没有同步,因为顺序不一致 

现在我们添加上条件变量代码,让线程开始等待,于是所有运行该函数的线程都会停留在wait处进行等待。而我们主线程,过了5秒开始唤醒等待,看看线程运行情况。

现在就是有一定的顺序性了,因为被唤醒的线程处理完毕后又会去进行等待,此时是添加在了等待队列的末尾。由于每次唤醒都要sleep(1),因此每一行都需要等一秒才能打印

如果我们将唤醒切换成broadcast,那么就是每一秒打印三次,因为有三个进程都被唤醒了。依然是有序的

2.5 条件变量进行抢票 

 现在我们让线程都去抢票,没票的时候就进行等待

那么现在没票之后,线程就不会疯狂的申请锁,释放锁,而是进行等待,等待后续还有票,你就给我发送唤醒,我再开始抢票。 

现在我们每隔5秒放一百张票,放完票就进行唤醒,让线程再去抢票 

 现在没票线程就等待,有票线程就开始抢,能够很好的控制线程抢票了。

3.条件变量的细节 

刚才我们看到,单纯的互斥,能保证数据安全,但不一定合理或者高效。因为如果不进行条件变量等待,那么线程会一直申请锁,发现没有票,再释放锁,while循坏又会来申请锁。可能主线程想要申请锁进行放票,都没有机会。

现在让线程再pthread_cond_wait这里进行等待,没有问题,但是你等待的时候必须要释放锁,因为如果你不释放,其他线程也就进不来。这也是为何pthread_cond_wait不仅仅要传环境变量,还要传mutux。

同时,线程被唤醒的时候,是在临界区中被唤醒的,当线程被唤醒,线程在pthread_cond_wait返回时,需要重新申请锁

四、基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。(也就是队列为空不能消费者不能消费,队列为满生产者不能生产

我们可以通过控制,来让生产和消费保持同步

1.单线程版 

我们利用C++提供的queue来封装阻塞队列,生产者往队列里面入数据,并发送唤醒让消费者消费,消费者往队列里面出数据,并发送唤醒让生产者生产数据。

BlockQueue.hpp代码如下

#pragma once

#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;

const int defaultcap = 5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int capacity = defaultcap)
        :_capacity(capacity)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_p_cond,nullptr);
        pthread_cond_init(&_c_cond,nullptr);
    }

    bool IsFull()
    {
        return _q.size()==_capacity;
    }

    bool IsEmpty()
    {
        return _q.size()==0;
    }

    void Push(const T& in)//生产者
    {
        pthread_mutex_lock(&_mutex);
        while(IsFull())
        {
            //阻塞等待
            pthread_cond_wait(&_p_cond,&_mutex);
        }
        _q.push(in);
        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }

    void Pop(T* out)    //消费者
    {
        pthread_mutex_lock(&_mutex);
        while(IsEmpty())
        {
            //阻塞等待
            pthread_cond_wait(&_c_cond,&_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_cond_signal(&_p_cond);
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }
private:
    queue<T> _q;
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _p_cond;//生产者条件变量
    pthread_cond_t _c_cond;//消费者条件变量
};

main.cc如下

#include "BlockQueue.hpp"
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
void *consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        int data = 0;
        bq->Pop(&data);
        cout<<"consumer data: "<<data<<endl;
    }
    return nullptr;
}

void *productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        int data = rand()%10+1;
        bq->Push(data);
        cout<<"productor data:"<<data<<endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand(time(nullptr));
    BlockQueue<int> *bq = new BlockQueue<int>();

    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, (void *)bq);
    pthread_create(&p, nullptr, productor, (void *)bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
}

运行就会发现,你生产一个,我就消费一个,不仅仅是单纯的互斥,而是进行了同步。不会再有线程饥饿的问题。

2.使用LockGuard 

 我们使用LockGuard来创建锁,让他创建时直接加锁,出了作用域析构自动解锁。

LockGuard.hpp代码如下

#pragma once
#include <pthread.h>

// 不定义锁,外部会传递锁
class Mutex
{
public:
    Mutex(pthread_mutex_t *lock)
        : _lock(lock)
    {
    }
    void Lock()
    {
        pthread_mutex_lock(_lock);
    }
    void UnLock()
    {
        pthread_mutex_unlock(_lock);
    }
    ~Mutex()
    {
    }

private:
    pthread_mutex_t *_lock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *lock)
        : _mutex(lock)
    {
        _mutex.Lock();
    }
    ~LockGuard()
    {
        _mutex.UnLock();
    }
private:
    Mutex _mutex;
};

 修改如下

3.多线程版 

多线程版很简单,因为我们的代码本来就是互斥的,因为在生产和消费都进行了上锁。因此直接创建线程就好了。

我们封装一下BlockQueue,多一个线程名字,同时创建两个生产线程与两个消费线程。

 运行可以看到,生产线程一下生产两个,消费线程也一下消费两个,四个一组,很好的配合起来了。

最后附上总代码 

BlockQueue.hpp

#pragma once

#include<iostream>
#include<queue>
#include<pthread.h>
#include "LockGuard.hpp"
using namespace std;

const int defaultcap = 5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int capacity = defaultcap)
        :_capacity(capacity)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_p_cond,nullptr);
        pthread_cond_init(&_c_cond,nullptr);
    }

    bool IsFull()
    {
        return _q.size()==_capacity;
    }

    bool IsEmpty()
    {
        return _q.size()==0;
    }

    void Push(const T& in)//生产者
    {
        LockGuard lockgaurd(&_mutex);
        // pthread_mutex_lock(&_mutex);
        while(IsFull())
        {
            //阻塞等待
            pthread_cond_wait(&_p_cond,&_mutex);
        }
        _q.push(in);
        pthread_cond_signal(&_c_cond);
        // pthread_mutex_unlock(&_mutex);
    }

    void Pop(T* out)    //消费者
    {
        LockGuard lockguard(&_mutex);
        // pthread_mutex_lock(&_mutex);
        while(IsEmpty())
        {
            //阻塞等待
            pthread_cond_wait(&_c_cond,&_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_cond_signal(&_p_cond);
        // pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }
private:
    queue<T> _q;
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _p_cond;//生产者条件变量
    pthread_cond_t _c_cond;//消费者条件变量
};

string Getname(int i)
{
    string s = "pthread_";
    s+=to_string(i);
    return s;
}


template<class T>
class ThreadData
{
public:
    ThreadData(BlockQueue<T>* bq)
        :_bq(bq)
    {
        _name = Getname(i++);
    }
public:
    BlockQueue<T>* _bq;
    string _name;
    static int i;
};

template<class T>
int ThreadData<T>::i = 1;

LockGuard.hpp

#pragma once
#include <pthread.h>

// 不定义锁,外部会传递锁
class Mutex
{
public:
    Mutex(pthread_mutex_t *lock)
        : _lock(lock)
    {
    }
    void Lock()
    {
        pthread_mutex_lock(_lock);
    }
    void UnLock()
    {
        pthread_mutex_unlock(_lock);
    }
    ~Mutex()
    {
    }

private:
    pthread_mutex_t *_lock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *lock)
        : _mutex(lock)
    {
        _mutex.Lock();
    }
    ~LockGuard()
    {
        _mutex.UnLock();
    }
private:
    Mutex _mutex;
};

Main.cc

#include "BlockQueue.hpp"
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>


void *consumer(void *args)
{
    ThreadData<int> *td = static_cast<ThreadData<int> *>(args);
    while (true)
    {
        int data = 0;
        td->_bq->Pop(&data);
        cout<<"consumer data: "<<data<<"线程名: "<<td->_name<<endl;
    }
    return nullptr;
}

void *productor(void *args)
{
    ThreadData<int> *td = static_cast<ThreadData<int> *>(args);
    while (true)
    {
        int data = rand()%10+1;
        td->_bq->Push(data);
        cout<<"productor data:"<<data<<"线程名: "<<td->_name<<endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand(time(nullptr));

    BlockQueue<int> *bq = new BlockQueue<int>();
    
    pthread_t c[2], p[2];

    ThreadData<int> *td1 = new ThreadData<int>(bq);
    pthread_create(&c[0], nullptr, consumer, td1);

    ThreadData<int> *td2 = new ThreadData<int>(bq);
    pthread_create(&p[0], nullptr, productor, td2);

    ThreadData<int> *td3 = new ThreadData<int>(bq);
    pthread_create(&c[1], nullptr, consumer, td3);

    ThreadData<int> *td4 = new ThreadData<int>(bq);
    pthread_create(&p[1], nullptr, productor, td4);


    pthread_join(c[0],nullptr);
    pthread_join(p[0],nullptr);
    pthread_join(c[1],nullptr);
    pthread_join(p[1],nullptr);
}


原文地址:https://blog.csdn.net/kkbca/article/details/137513688

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!