自学内容网 自学内容网

Linux——线程条件变量(同步)

Linux——多线程的控制-CSDN博客


文章目录


目录

文章目录

前言

一、条件变量是什么?

1、死锁的必要条件

1. 互斥条件(Mutual Exclusion)

2. 请求和保持条件(Hold and Wait)

3. 不可剥夺条件(No Preemption)

4. 循环等待条件(Circular Wait)

2、同步概念与竞态条件

 3、条件变量的必要条件

 1 、与互斥锁配合使用

2. 条件检查的持续性

二、条件变量的使用及函数

1.条件变量的工作流程:

工作流程

2、条件变量所需函数

1、互斥锁函数

2、条件变量函数

3、cp问题(生产者 - 消费者问题)

四、设计基本的条件变量代码:

总结


前言

上篇博客我们介绍基础的线程控制及线程的锁;还有互斥,那么有了互斥就有同步;

我们介绍互斥是多个线程去抢占一个共享资源对其操作,从而导致共享资源的紊乱,为了解决这个问题我们引入了锁,对其加锁了以后共享资源的处理就不会出现同时进行处理了;

那么对于一个共享资源我们想要同时对其操作有应该怎么办呢?

那么我们就介绍一下同步机制。


一、条件变量是什么?

1、死锁的必要条件

1. 互斥条件(Mutual Exclusion)

  • 条件解释
    • 资源在同一时间只能被一个进程或线程使用。
  • 生活案例
    • 假设有一个卫生间,只有一个马桶,一次只能一个人使用。A 和 B 两个人都想使用这个马桶,A 进去后锁上了门,B 就只能在外面等待,此时这个马桶对于 A 和 B 来说就是互斥资源,A 正在使用时,B 无法使用,反之亦然。

2. 请求和保持条件(Hold and Wait)

  • 条件解释
    • 进程或线程已经持有了至少一个资源,但又提出了新的资源请求,而该资源被其他进程或线程占用,同时又不释放自己已经持有的资源。
  • 生活案例
    • 还是卫生间的例子,A 进入卫生间后,拿着卫生纸(资源 1),但发现没有洗发水(资源 2),此时 A 不放下卫生纸,继续等待洗发水,而洗发水在 B 那里,B 进入另一个卫生间拿着洗发水,等待卫生纸。A 持有卫生纸请求洗发水,B 持有洗发水请求卫生纸,并且都不释放自己已有的资源,就形成了请求和保持的情况。

3. 不可剥夺条件(No Preemption)

  • 条件解释
    • 进程或线程所获得的资源在未使用完之前,不能被其他进程或线程强行剥夺,只能由自己释放。
  • 生活案例
    • 继续上述例子,A 已经进入卫生间并使用了卫生纸,其他人不能强行进入卫生间从 A 手中夺走卫生纸,只有 A 自己用完卫生纸后才会将其释放,B 对洗发水也是同样的情况,别人无法强行从 B 手中夺走洗发水,这就是资源的不可剥夺性。

4. 循环等待条件(Circular Wait)

  • 条件解释
    • 存在一种进程或线程的循环等待链,链中的每个进程或线程都在等待下一个进程或线程所占有的资源。
  • 生活案例
    • 在卫生间场景中,A 拿着卫生纸等待洗发水,B 拿着洗发水等待卫生纸,A 等待 B 释放洗发水,B 等待 A 释放卫生纸,形成了一个循环等待的链条。

2、同步概念与竞态条件

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步;

竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

在多线程编程中,条件变量是一种非常重要的同步机制,其主要作用是允许线程在某些条件未满足时进入等待状态,并在条件满足时被唤醒。

  • 阻塞等待

    • 条件变量允许一个线程在等待某个特定条件满足时进入阻塞状态,从而避免了线程的忙等待(busy-waiting)。忙等待是指线程在一个循环中不断检查条件是否满足,这会浪费 CPU 资源。例如,在生产者 - 消费者问题中,消费者线程在缓冲区为空时可以使用条件变量等待,而不是不断检查缓冲区是否有数据,这样可以将 CPU 资源让给其他线程。
    • 例如,一个线程可能需要等待某个资源被其他线程释放,或者等待某个事件发生,使用条件变量可以让线程进入休眠状态,直到条件满足。
  • 唤醒等待线程

    • 当条件满足时,另一个线程可以通过条件变量唤醒一个或多个等待的线程。这使得线程之间可以协调工作,避免了复杂的轮询机制。
    • 例如,在生产者 - 消费者问题中,当生产者将数据放入缓冲区后,可以使用条件变量通知等待的消费者线程,让其可以开始消费数据。
    • 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

      例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

 3、条件变量的必要条件

 1 、与互斥锁配合使用

  • 条件
    • 必须与互斥锁(mutex)一起使用。互斥锁用于保护对共享数据的访问,条件变量用于线程间的等待和唤醒操作。在调用 pthread_cond_wait 函数之前,线程必须先获取互斥锁。

2. 条件检查的持续性

  • 条件
    • 检查条件的代码应该使用 while 循环,而不是 if 语句。因为线程可能被虚假唤醒(即没有其他线程调用 pthread_cond_signal 或 pthread_cond_broadcast 时也被唤醒),使用 while 循环可以确保在被唤醒后重新检查条件是否真正满足。

二、条件变量的使用及函数

  • 1.条件变量的工作流程

    • 当一个线程需要等待某个条件满足时,它首先获取与条件变量相关联的互斥锁,然后检查条件是否满足。如果条件不满足,线程会将自己加入到等待队列(wait_queue)中,并释放互斥锁,进入阻塞状态。
    • 当其他线程修改了共享资源,使得条件可能满足时,它会获取互斥锁,然后检查等待队列。如果等待队列不为空,它会通过条件变量发送唤醒信号(如 pthread_cond_signal 或 pthread_cond_broadcast),唤醒一个或多个等待线程。
    • 被唤醒的线程会从等待队列中移出,重新获取互斥锁,然后再次检查条件是否真正满足(因为可能存在虚假唤醒)。如果条件满足,线程继续执行后续操作;否则,它会再次进入等待队列。

  • 有一家餐厅,里面有几张餐桌(可类比为共享资源),顾客(类比为线程)来餐厅就餐。餐厅有一个服务员(类比为负责管理条件变量和互斥锁的协调者),还有一个叫号系统(类比为条件变量)和一个门(类比为互斥锁)。

 

工作流程

  • 顾客进入餐厅(获取互斥锁)
    • 当顾客来到餐厅门口时,门是关着的(表示互斥锁被锁定),顾客需要等待服务员来开门(获取互斥锁)才能进入。这就像线程在访问共享资源前需要先获取互斥锁,时间以确保同一只有一个线程能访问该资源。
  • 检查就餐条件(检查条件是否满足)
    • 顾客进入餐厅后,会先看看有没有空的餐桌(检查条件是否满足,比如是否有空闲的资源可供使用)。如果没有空餐桌(条件不满足),顾客就会站在叫号区等待(进入等待队列,对应线程调用 pthread_cond_wait 进入等待状态),同时告诉服务员自己在等待(释放互斥锁,让其他顾客可以进入餐厅查看是否有座位)。
  • 有餐桌空出(条件满足)
    • 当有一桌顾客吃完离开(共享资源被释放,条件可能满足),服务员会去查看叫号区是否有等待的顾客(检查等待队列)。如果有,服务员会叫下一个号(通过条件变量发送唤醒信号,如 pthread_cond_signal),通知等待的顾客可以去就餐了(唤醒等待的线程)。
    • 被叫到号的顾客听到叫号后(被唤醒的线程),会走到服务员那里(重新获取互斥锁),然后服务员会再次确认是否真的有空餐桌(再次检查条件,因为可能存在虚假唤醒,比如可能在服务员查看叫号区的瞬间又有新顾客进来占用了刚空出的餐桌)。如果确实有空餐桌,顾客就可以去就餐了(线程继续执行后续操作);如果没有,顾客又得回到叫号区等待(再次进入等待队列)。

2、条件变量所需函数

因为条件变量的使用必须要有锁,所以先再次介绍锁

1、互斥锁函数

互斥锁初始化函数:pthread_mutex_init

互斥锁加锁函数:pthread_mutex_lock

互斥锁解锁函数:pthread_mutex_unlock

互斥锁的销毁函数:pthread_mutex_destroy

pthread_mutex_destroy(pthread_cond_t *cond);

2、条件变量函数

初始化

  • pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
    • 初始化一个条件变量 condattr 是条件变量的属性,通常可设置为 NULL 以使用默认属性。

等待条件变量

  • pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
    • 使线程进入睡眠状态并释放 mutex 锁,等待条件变量 cond 被唤醒。当被唤醒时,它会重新获取 mutex 锁。

​​​​​​​唤醒线程

  • pthread_cond_signal(pthread_cond_t *cond);
    • 唤醒一个等待在条件变量 cond 上的线程。

​​​​​​​销毁条件变量

  • pthread_cond_destroy(pthread_cond_t *cond);
    • 当不再使用条件变量时,将其销毁,释放相关资源。

代码的使用

#include <iostream>
#include <unistd.h>
#include <pthread.h>

int cnt = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *Count(void * args)
{
    pthread_detach(pthread_self());
    uint64_t number = (uint64_t)args;
    std::cout << "pthread: " << number << " create success" << std::endl;

    while(true)
    {
        pthread_mutex_lock(&mutex);
        // 我们怎么知道我们要让一个线程去休眠了那?一定是临界资源不就绪,没错,临界资源也是有状态的!!
        // 你怎么知道临界资源是就绪还是不就绪的?你判断出来的!判断是访问临界资源吗?必须是的,也就是判断必须在加锁之后!!!
        pthread_cond_wait(&cond, &mutex);               //? 为什么在这里? 1. pthread_cond_wait让线程等待的时候,会自动释放锁!
        // 不管临界资源的状态情况
        std::cout << "pthread: " << number << " , cnt: " << cnt++ << std::endl;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    for(uint64_t i = 0; i < 5; i++)
    {
        pthread_t tid;
        pthread_create(&tid, nullptr, Count, (void*)i);
        usleep(1000);
    }
    sleep(3);
    std::cout << "main thread ctrl begin: " << std::endl;

    while(true) 
    {
        sleep(1);
        // pthread_cond_signal(&cond); //唤醒在cond的等待队列中等待的一个线程,默认都是第一个
        pthread_cond_broadcast(&cond);
        std::cout << "signal one thread..." << std::endl;
    }

    return 0;
}

3、cp问题(生产者 - 消费者问题)

这就是基本的条件变量使用的问题;

我们的条件变量的使用就像这张图一样

1、一个共享资源(如超市、特定结构的内存空间)

消费者和消费者:它们是互斥的

生产者和生产者:它们也是互斥的

生产者和消费者:它们是同步的

2、两个角色:消费者和生产者

3、我们需要拥有三种关系

消费者和消费者:它们是互斥的

生产者和生产者:它们也是互斥的

生产者和消费者:它们是同步的

  • 三种关系
    • 生产者与生产者之间的互斥:多个生产者不能同时向共享资源写入数据,否则可能导致数据混乱或冲突。例如,多个进程同时向磁盘的同一区域写入文件内容,可能会损坏数据。
    • 消费者与消费者之间的互斥:多个消费者不能同时从共享资源读取同一数据,以免数据不一致。比如多个进程同时读取一个正在被修改的文件,可能得到错误的结果。
    • 生产者与消费者之间的互斥与同步:生产者和消费者之间不仅要互斥访问共享资源,还要同步,以确保生产者生产的数据能被消费者及时消费,且不会出现消费者消费不存在的数据或生产者生产的数据被覆盖等问题。例如,当磁盘空间已满(类似超市货架已满),生产者需要等待消费者释放空间(消费一些文件)后才能继续生产(写入新文件);而当磁盘上没有可消费的文件时,消费者需要等待生产者生产文件后才能继续消费。

四、设计基本的条件变量代码:

 

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>

template <class T>
class BlockQueue
{
    static const int defalutnum = 20;
public:
    BlockQueue(int maxcap = defalutnum):maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);
        pthread_cond_init(&p_cond_, nullptr);
        // low_water_ = maxcap_/3;
        // high_water_ = (maxcap_*2)/3;
    }

    // 谁来唤醒呢?
    T pop()
    {
        pthread_mutex_lock(&mutex_);
        while(q_.size() == 0) // 因为判断临界资源调试是否满足,也是在访问临界资源!判断资源是否就绪,是通过再临界资源内部判断的。
        {
            // 如果线程wait时,被误唤醒了呢??
            pthread_cond_wait(&c_cond_, &mutex_); // 你是持有锁的!!1. 调用的时候,自动释放锁,因为唤醒而返回的时候,重新持有锁
        }
        
        T out = q_.front(); // 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足
        q_.pop();

        // if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);
        pthread_cond_signal(&p_cond_); // pthread_cond_broadcast
        pthread_mutex_unlock(&mutex_);

        return out;
    }

    void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);
        while(q_.size() == maxcap_){ // 做到防止线程被伪唤醒的情况
            // 伪唤醒情况
            pthread_cond_wait(&p_cond_, &mutex_); //1. 调用的时候,自动释放锁 2.?
        }
        // 1. 队列没满 2.被唤醒 
        q_.push(in);                    // 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足
        // if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
        pthread_cond_signal(&c_cond_);
        pthread_mutex_unlock(&mutex_);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }
private:
    std::queue<T> q_; // 共享资源, q被当做整体使用的,q只有一份,加锁。但是共享资源也可以被看做多份!
    //int mincap_;
    int maxcap_;      // 极值
    pthread_mutex_t mutex_;
    pthread_cond_t c_cond_;
    pthread_cond_t p_cond_;

    // int low_water_;
    // int high_water_;
};
std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

 

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>

void *Consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);

    while (true)
    {
        // 消费
        Task t = bq->pop();

        // 计算
        // t.run();
        t();

        std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl;
        // t.run();
        // sleep(1);
    }
}

void *Productor(void *args)
{
    int len = opers.size();
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    int x = 10;
    int y = 20;
    while (true)
    {
        // 模拟生产者生产数据
        int data1 = rand() % 10 + 1; // [1,10]
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        // 生产
        bq->push(t);
        std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << pthread_self() << std::endl;
        sleep(1);
    }
}

int main()
{
    srand(time(nullptr));

    // 因为 321 原则
    // BlockQueue 内部可不可以传递其他数据,比如对象?比如任务???
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t c[3], p[5];
    for (int i = 0; i < 3; i++)
    {
        pthread_create(c + i, nullptr, Consumer, bq);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_create(p + i, nullptr, Productor, bq);
    }

    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }
    delete bq;
    return 0;
}

总结

初始化

  • pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);

等待条件变量

  • pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

​​​​​​​唤醒线程

  • pthread_cond_signal(pthread_cond_t *cond);

​​​​​​​销毁条件变量

  • pthread_cond_destroy(pthread_cond_t *cond);


原文地址:https://blog.csdn.net/2301_80687320/article/details/145249391

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!