自学内容网 自学内容网

初识Linux · 编写生产消费模型(2)

目录

前言:

RingQueue编写生产消费模型

认识接口

开始编写


前言:

前文我们介绍了基于阻塞队列实现生产消费模型,使用阻塞队列实现生产消费模型中,我们学习到了pthread_cond_wait的第二个参数的重要性,不仅会解锁,此时锁被其他人持有,当条件满足的时候,就重新竞争锁,所以在pthread_cond_wait函数这里是不会存在死锁的。

第二个重要的条件是while,首先不说函数等待失败,直接造成错误的情况,如果多个消费者同时在等待,但是只有一把锁,这种情况就需要重新判断,所以使用的while,以上是前文两个比较重要的讨论。

但是纵观全文,我们发现基于阻塞队列实现生产消费模型的时候,是必须要对条件是否满足进行判断的,于是就有人思考,可不可以让其他东西代替我们判断条件是否满足呢?毕竟我们这里代码量小,不容易出错,对于代码量的情况来说的话,就不好说了,所以我们应该借助什么东西呢?

答案是信号量,对于信号量存在的PV操作,以及信号量是将整个资源划分成一个一个的小部分,我们在进程间通信部分已经介绍了,这里我们不再花费更多的时间介绍信号量,但是我们有必要认识一下信号量的接口,基于信号量编写生产消费模型我们可以尝试使用环形队列,我们使用vector来模拟。

更多细节就直接进入主题吧!


RingQueue编写生产消费模型

认识接口

对于信号量的接口,Ubuntu系统也可以可以man到的,我们需要用到的接口有sem_init,sem_wait和sem_post,sem_destroy,这是在POSIX里面的信号量的操作,而不是system V的操作,两者要分清楚,那么对应的头文件是semaphore.h文件:

int sem_init(sem_t *sem, int pshared, unsigned int value);

int sem_wait(sem_t *sem);

int sem_post(sem_t *sem);

int sem_destroy(sem_t *sem);

其中稍微复杂一点就是sem_init函数了,其他的多简单,有了线程学习之前的基础,我们基本上可以直接使用了。sem_init函数的第一个参数是sem_t类型的,和phtread_t的一样,第二个参数我们直接设置为0,这个参数决定的是线程间共享信号量还是进程间共享信号量,0代表的线程间共享,第三个参数就是申请多少个信号量。

对于信号量来说,在System -V里面,我们简短的介绍了pv操作,对于P操作相当于信号量--,对于V操作相当于信号量++,而因为是对临界资源的访问,所以这两个操作应该是原子的。

对于以上函数的返回值都是成功返回0,失败返回-1,并且错误码被设置。

可是为什么使用以上的函数就可以不用判断条件是否满足了呢?

  1. sem_wait()函数

    • 功能:相当于P操作,用于等待信号量变为正数(即请求资源)。如果信号量的值为0,则调用线程将被阻塞,直到信号量的值大于0为止。
    • 参数:包括指向信号量对象的指针(sem)。
    • 返回值:成功时返回0,失败时返回-1并设置errno。
  2. sem_post()函数

    • 功能:相当于V操作,用于释放信号量(即释放资源)。信号量的值将增加1。如果有任何线程在等待该信号量,则其中一个线程将被唤醒。
    • 参数:包括指向信号量对象的指针(sem)。
    • 返回值:成功时返回0,失败时返回-1并设置errno。

因为PV操作的函数,会自动判断信号量,或者说条件是否满足,如果满足,那么就操作线程。

以上是对于sem_*函数的介绍。

开始编写

同前文的blockqueue一样,我们先来确定成员变量应该有谁?

首先,我们既然是基于环形队列和信号量编写的,那么生产者和消费者的位置,我们应该知道吧?那么就应该有两个变量用来表示位置。对于位置来说,后续操作肯定是免不了%操作的,虽然有PV操作,但是我们应该防止越界。

其次,信号量的变量肯定要有吧?在构造函数和析构函数的时候初始化 + 析构就可以了。可是我们应该引入几个信号量呢?在最开始生产者生产的时候,消费者一个信号量都不能消费吧?那么这不就是初识信号量为0吗?当生产者进行了V操作之后,消费者的信号量+1(重点),消费者消费了同理,所以应该有两个信号量。

最后,对于锁来说,我们前文加锁是为了防止对于临界资源的访问出错,这里需要加吗?当然要加了,对于环形队列的访问难道不是临界资源吗?当然是了,所以同样需要锁,可是需要几把锁呢?一把锁吗?如果只用一把锁,那不就是基于阻塞队列编写的吗?(只能有一个人操作)这里既然有了信号量的加持,都有人帮我们判断条件了,我们不妨设计两把锁,因为生产者和消费者是并发进行的,要满足消费者和消费者,生产者和生产者之间的关系。即互斥。对于生产者和消费者之间,我们都不用担心互斥,信号量已经帮我们做了,我们只要保证同步即可。

const int default_cap = 5;

template<typename T>
class RingQueue
{

public:

private:
    std::vector<T> _ring_queue;
    int _max_cap;

    int _c_step;
    int _p_step;

    sem_t _data_sem;
    sem_t _space_sem;

    pthread_mutex_t _c_mutex;
    pthread_mutex_t _p_mutex;
};

那么就是正常的构造函数和析构函数:

const int default_cap = 5;

template<typename T>
class RingQueue
{

public:
    RingQueue(int max_cap = default_cap)
     : _max_cap(max_cap), _c_step(0), _p_step(0), _ring_queue(max_cap)
    {
        sem_init(&_data_sem, 0, 0);
        sem_init(&_space_sem, 0, _max_cap);

        pthread_mutex_init(&_c_mutex, nullptr);
        pthread_mutex_init(&_p_mutex, nullptr);
    }

    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);
        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }


private:
    std::vector<T> _ring_queue;
    int _max_cap;

    int _c_step;
    int _p_step;

    sem_t _data_sem;
    sem_t _space_sem;

    pthread_mutex_t _c_mutex;
    pthread_mutex_t _p_mutex;
};

有了析构和构造,我们现在只需要关心pop和push了,其中我们不妨简单封装一下PV操作的函数:

private:
    void P(sem_t& sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t& sem)
    {
        sem_post(&sem);
    }

对于push操作,因为访问了临界资源,所以一定要加锁,加锁之后,生产者的位置需要++,并且要保证++之后不会被超出队列的总长度,那么就要模运算,就没了:

    void Push(const T& in)
    {
        P(_space_sem);
        pthread_mutex_lock(&_p_mutex);
        _ring_queue[_p_step] = in;
        _p_step++;
        _p_step %= _max_cap;
        pthread_mutex_unlock(&_p_mutex);   
        V(_data_sem);
    }

对于PV操作一定是二者都要同时进行的,不会只执行单独的一个,都是全部执行。

对于pop操作一样的:

    void Pop(T* out)
    {
        P(_data_sem);
        pthread_mutex_lock(&_c_mutex);
        *out = _ring_queue[_c_step];
        _c_step++;
        _c_step %= _max_cap;
        pthread_mutex_unlock(&_c_mutex);
        V(_space_sem);
    }

对于头文件RingQueue.hpp的编写就结束了,我们在主函数部分编写测试代码试试:

void *Consumer(void *args)
{
    RingQueue<int> *c = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int out = 0;
        c->Pop(&out);
        std::cout << "Consumer pop data-> " << out << std::endl;
        //sleep(1);
    }
}

void *Productor(void *args)
{
    RingQueue<int> *q = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int in = rand() % 100 + 1;
        q->Push(in);
        std::cout << "Productor push data-> " << in << std::endl;
        // sleep(1);
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    pthread_t c, p;
    RingQueue<int> *rq = new RingQueue<int>;
    pthread_create(&c, nullptr, Consumer, (void *)rq);
    pthread_create(&p, nullptr, Productor, (void *)rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    return 0;
}

以上是main函数的编写,实际上,这份代码也支持多生产多消费,毕竟锁在那里,那么以上就是环形队列编写生产消费模型的介绍。


感谢阅读!


原文地址:https://blog.csdn.net/2301_79697943/article/details/144345057

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!