Linux——线程同步与生产者消费者模型
目录
前言
之前我们学习了线程互斥,知道了系统为保护临界资源,需要让线程互斥,一个个的访问资源,同时我们也发现线程饥饿的问题,就是某一个线程一直持有锁,释放锁,再持有锁,释放锁,其他线程根本就竞争不过他,因此又提出了线程同步,让线程有序的进行申请锁资源。
一、线程同步
我们讲个小故事
- 之前,我们想象中的同步就是夫妻两人你看你的斗破苍穹,我玩我的英雄联盟。我们两个一起进行且互不干扰。
- 但是今天,我们都要向存钱罐里拿钱,你为了去买更好的手机看小说,我为了去买更好的电脑打游戏。
- 我们也不能一起将手往存钱罐里面摸,因为存钱罐口就那么大,因此就得互斥的去拿钱,也就是我拿的时候你不能拿,你拿的时候我不能拿。
- 但是你也不能太霸道的,比如说你一直去拿钱,刚拿完钱出来,又将手伸过去拿,导致钱都被你拿光了,我一直无法从里面拿钱,那我就饥饿了。
- 这样长此以往肯定不行,那么现在规则修改了,从存钱灌里面拿钱之后,不能再立刻申请,而应该在后面排队,这样每个人都可以拿到钱干自己想干的事情。
在这个故事中
- 夫妻二人就是两个线程
- 钱是临界资源
- 存钱罐口就是一把锁
- 同一时间只能一个人拿钱叫做线程互斥
- 按顺序拿钱就是线程同步
线程同步是在临界资源使用安全的前提下,让多线程执行具有一定的顺序性。
互斥能保证资源的安全,同步能较为充分高效的使用资源。
二、生产者消费者模型
在我们生活中,超市是典型的生产者消费者模型。生产者生成商品放入超市,消费者进入超市消费商品。
在计算机生产者消费者模型中,生产者是线程,消费者也是线程。生产线程生成数据,消费线程消费(处理)数据。而超市本质上就是内存空间。
在这个过程中,我们必须保证生产消费过程是安全的!
- 不可能说你想在超市的最好的地方放东西,我也想在这里放东西,这样就会发生数据覆盖的现象,因此生产者之间是需要互斥的。
- 而超市的资源(数据)是很珍贵的,如果两人一起访问并做处理,可能会发生数据紊乱或者浪费消费者资源的事情。因此消费者之间也是互斥的!
- 而生产者和消费者之间,必须也是互斥的,因为你放入完毕之后我才能来拿,如果你刚放入一点,比如想放入“hello world”,只放入了“hello”我就来拿走,也会发生问题。
- 同时生产者和消费者之间,还得是同步的。如果仅仅是互斥,那么可以连续一个月生产者都在往里面放资源,消费者没机会来消费,那就效率太低下了。同步之后,生产者发现我生产的东西比较多了,就让消费者来拿,消费者看见超市里面没什么东西了,就让生产者生产。这样就可以保证生产者消费者模型更好的运转。
小总结
生产者之间互斥
消费者之间互斥
生产者与消费者之间互斥+同步
三、条件变量
1.理解条件变量
我们说了这么多的同步与生产者消费者模型,那如何实现线程同步呢?pthread库给我们提供了条件变量。
我们讲个小故事来理解条件变量
- 有一个拿苹果游戏,一个人往箱子里放苹果,另一个人蒙着眼睛从箱子里拿苹果,箱子只有一个口,就是要么只允许放,要么只允许拿。
- 拿苹果的人需要尽快拿到苹果,但又不清楚放苹果的人什么时候会放入苹果,于是他疯狂的从箱子里面拿,刚摸了一下发现没苹果,过了几秒不放心,感觉可能放苹果的人已经放入了苹果,于是又想去拿苹果。
- 就这样,拿苹果的人一直在申请拿苹果,人家放苹果的人就算到来了也来不及放,因为你一直拿,占用着锁。
- 于是乎,现在你们想了一个办法,就是你先不要来拿,你等我发消息,当我放入的时候会给你发送消息,你现在再来拿苹果。
- 这个消息就是条件变量!!!!
如果拿苹果的人很多,他们也需要排队,那么条件变量中肯定要维护一个线程队列,同时还要有个标志位判断条件是否就绪。
2.条件变量接口
2.1 条件变量初始化与销毁
如下是条件变量的初始化和销毁接口,特别类似于互斥锁。
定义全局:PTHREAD_COND_INITAILIZER 直接初始化即可
定义局部:pthread_cond_init()
- 参数cond:pthread_cond_t cond创建条件变量,将&cond传入即可。
- 参数attr:设置条件变量属性,默认传NULL;
销毁:pthread_cond_destroy()
- 参数:&cond
2.2 条件变量等待
条件变量等待接口如下,(等待的是资源就绪)
pthread_cond_wait()
- 参数cond:需要等待的条件变量
- 参数mutex:需要的锁
- 返回值:成功返回0,失败返回错误原因。
2.3 条件变量唤醒等待
条件变量唤醒等待接口如下
pthread_cond_signal()作用是使指定的条件变量成立,并且唤醒一个线程。
pthread_cond_broadcast()作用是使指定的条件变量成立,并且唤醒所有线程。
他们两个的参数都是 &cond
2.4 条件变量接口运用
如下是互斥代码,通过打印观察运行情况
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
using namespace std;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *threadRoutine(void *args)
{
string name = static_cast<const char*> (args);
while (true)
{
sleep(1);
pthread_mutex_lock(&mutex);
cout<<"I am a new thread : "<<name<<endl;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,threadRoutine,(void*)"thread_1");
pthread_create(&t2,nullptr,threadRoutine,(void*)"thread_2");
pthread_create(&t3,nullptr,threadRoutine,(void*)"thread_3");
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
}
运行发现现在仅仅是互斥,并没有同步,因为顺序不一致
现在我们添加上条件变量代码,让线程开始等待,于是所有运行该函数的线程都会停留在wait处进行等待。而我们主线程,过了5秒开始唤醒等待,看看线程运行情况。
现在就是有一定的顺序性了,因为被唤醒的线程处理完毕后又会去进行等待,此时是添加在了等待队列的末尾。由于每次唤醒都要sleep(1),因此每一行都需要等一秒才能打印
如果我们将唤醒切换成broadcast,那么就是每一秒打印三次,因为有三个进程都被唤醒了。依然是有序的
2.5 条件变量进行抢票
现在我们让线程都去抢票,没票的时候就进行等待
那么现在没票之后,线程就不会疯狂的申请锁,释放锁,而是进行等待,等待后续还有票,你就给我发送唤醒,我再开始抢票。
现在我们每隔5秒放一百张票,放完票就进行唤醒,让线程再去抢票
现在没票线程就等待,有票线程就开始抢,能够很好的控制线程抢票了。
3.条件变量的细节
刚才我们看到,单纯的互斥,能保证数据安全,但不一定合理或者高效。因为如果不进行条件变量等待,那么线程会一直申请锁,发现没有票,再释放锁,while循坏又会来申请锁。可能主线程想要申请锁进行放票,都没有机会。
现在让线程再pthread_cond_wait这里进行等待,没有问题,但是你等待的时候必须要释放锁,因为如果你不释放,其他线程也就进不来。这也是为何pthread_cond_wait不仅仅要传环境变量,还要传mutux。
同时,线程被唤醒的时候,是在临界区中被唤醒的,当线程被唤醒,线程在pthread_cond_wait返回时,需要重新申请锁。
四、基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。(也就是队列为空不能消费者不能消费,队列为满生产者不能生产)
我们可以通过控制,来让生产和消费保持同步
1.单线程版
我们利用C++提供的queue来封装阻塞队列,生产者往队列里面入数据,并发送唤醒让消费者消费,消费者往队列里面出数据,并发送唤醒让生产者生产数据。
BlockQueue.hpp代码如下
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
const int defaultcap = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(int capacity = defaultcap)
:_capacity(capacity)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_p_cond,nullptr);
pthread_cond_init(&_c_cond,nullptr);
}
bool IsFull()
{
return _q.size()==_capacity;
}
bool IsEmpty()
{
return _q.size()==0;
}
void Push(const T& in)//生产者
{
pthread_mutex_lock(&_mutex);
while(IsFull())
{
//阻塞等待
pthread_cond_wait(&_p_cond,&_mutex);
}
_q.push(in);
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
void Pop(T* out) //消费者
{
pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
//阻塞等待
pthread_cond_wait(&_c_cond,&_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond;//生产者条件变量
pthread_cond_t _c_cond;//消费者条件变量
};
main.cc如下
#include "BlockQueue.hpp"
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
void *consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
int data = 0;
bq->Pop(&data);
cout<<"consumer data: "<<data<<endl;
}
return nullptr;
}
void *productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
int data = rand()%10+1;
bq->Push(data);
cout<<"productor data:"<<data<<endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr));
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, consumer, (void *)bq);
pthread_create(&p, nullptr, productor, (void *)bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
}
运行就会发现,你生产一个,我就消费一个,不仅仅是单纯的互斥,而是进行了同步。不会再有线程饥饿的问题。
2.使用LockGuard
我们使用LockGuard来创建锁,让他创建时直接加锁,出了作用域析构自动解锁。
LockGuard.hpp代码如下
#pragma once
#include <pthread.h>
// 不定义锁,外部会传递锁
class Mutex
{
public:
Mutex(pthread_mutex_t *lock)
: _lock(lock)
{
}
void Lock()
{
pthread_mutex_lock(_lock);
}
void UnLock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{
}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock)
: _mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.UnLock();
}
private:
Mutex _mutex;
};
修改如下
3.多线程版
多线程版很简单,因为我们的代码本来就是互斥的,因为在生产和消费都进行了上锁。因此直接创建线程就好了。
我们封装一下BlockQueue,多一个线程名字,同时创建两个生产线程与两个消费线程。
运行可以看到,生产线程一下生产两个,消费线程也一下消费两个,四个一组,很好的配合起来了。
最后附上总代码
BlockQueue.hpp
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
#include "LockGuard.hpp"
using namespace std;
const int defaultcap = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(int capacity = defaultcap)
:_capacity(capacity)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_p_cond,nullptr);
pthread_cond_init(&_c_cond,nullptr);
}
bool IsFull()
{
return _q.size()==_capacity;
}
bool IsEmpty()
{
return _q.size()==0;
}
void Push(const T& in)//生产者
{
LockGuard lockgaurd(&_mutex);
// pthread_mutex_lock(&_mutex);
while(IsFull())
{
//阻塞等待
pthread_cond_wait(&_p_cond,&_mutex);
}
_q.push(in);
pthread_cond_signal(&_c_cond);
// pthread_mutex_unlock(&_mutex);
}
void Pop(T* out) //消费者
{
LockGuard lockguard(&_mutex);
// pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
//阻塞等待
pthread_cond_wait(&_c_cond,&_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_p_cond);
// pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond;//生产者条件变量
pthread_cond_t _c_cond;//消费者条件变量
};
string Getname(int i)
{
string s = "pthread_";
s+=to_string(i);
return s;
}
template<class T>
class ThreadData
{
public:
ThreadData(BlockQueue<T>* bq)
:_bq(bq)
{
_name = Getname(i++);
}
public:
BlockQueue<T>* _bq;
string _name;
static int i;
};
template<class T>
int ThreadData<T>::i = 1;
LockGuard.hpp
#pragma once
#include <pthread.h>
// 不定义锁,外部会传递锁
class Mutex
{
public:
Mutex(pthread_mutex_t *lock)
: _lock(lock)
{
}
void Lock()
{
pthread_mutex_lock(_lock);
}
void UnLock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{
}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock)
: _mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.UnLock();
}
private:
Mutex _mutex;
};
Main.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
void *consumer(void *args)
{
ThreadData<int> *td = static_cast<ThreadData<int> *>(args);
while (true)
{
int data = 0;
td->_bq->Pop(&data);
cout<<"consumer data: "<<data<<"线程名: "<<td->_name<<endl;
}
return nullptr;
}
void *productor(void *args)
{
ThreadData<int> *td = static_cast<ThreadData<int> *>(args);
while (true)
{
int data = rand()%10+1;
td->_bq->Push(data);
cout<<"productor data:"<<data<<"线程名: "<<td->_name<<endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr));
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_t c[2], p[2];
ThreadData<int> *td1 = new ThreadData<int>(bq);
pthread_create(&c[0], nullptr, consumer, td1);
ThreadData<int> *td2 = new ThreadData<int>(bq);
pthread_create(&p[0], nullptr, productor, td2);
ThreadData<int> *td3 = new ThreadData<int>(bq);
pthread_create(&c[1], nullptr, consumer, td3);
ThreadData<int> *td4 = new ThreadData<int>(bq);
pthread_create(&p[1], nullptr, productor, td4);
pthread_join(c[0],nullptr);
pthread_join(p[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(p[1],nullptr);
}
原文地址:https://blog.csdn.net/kkbca/article/details/137513688
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!