自学内容网 自学内容网

(转载)std::mutex && std::condition_variable

#include <iostream>
#include <mutex>
#include <queue>
 
using data_chunk = int ;
std::mutex mut;
std::queue<data_chunk> data_queue; // 1
std::condition_variable data_cond;
int i = 0;
 
bool more_data_to_prepare()
{
    return true;
}
 
data_chunk prepare_data()
{
    return i++;
}
 
void process(data_chunk data)
{
    std::cout << data << std::endl;
}
 
bool is_last_chunk(data_chunk)
{
    return false;
}
 
void data_preparation_thread()
{
    while(more_data_to_prepare())
    {
        const data_chunk data = prepare_data();
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);   // (2)
        data_cond.notify_one();  // (3)
    }
}
 
void data_processing_thread()
{
    while(true)
    {
        std::unique_lock<std::mutex> lk(mut); // (4)
        data_cond.wait(lk, [] {return !data_queue.empty(); });  // (5)
        data_chunk data = data_queue.front();
        data_queue.pop();
        lk.unlock();  // (6)
        process(data);
        if (is_last_chunk(data))
            break;
    }
}
 
int main()
{
    std::thread t1(data_preparation_thread);
    std::thread t2(data_processing_thread);
 
    t1.join();
    t2.join();
    return 0;
}

注意(4)必须使用 unique_lock,而不能用guard_lock,因为在condition_variable内部实现中,有可能(5)要多次进行lock和unlock操作。

 
int main()
{
    std::thread t1(data_preparation_thread);
    std::thread t2(data_processing_thread);
 
    t1.join();
    t2.join();
    return 0;
}

首先,队列中中有两个线程,两个线程之间会对数据进行传递①。数据准备好时,使用std::lock_guard锁定队列,将准备好的数据压入队列②之后,线程会对队列中的数据上锁,并调用std::condition_variable的notify_one()成员函数,对等待的线程(如果有等待线程)进行通知③。

另外的一个线程正在处理数据,线程首先对互斥量上锁。之后会调用std::condition_variable的成员函数wait(),传递一个锁和一个Lambda表达式(作为等待的条件⑤)。Lambda函数是C++11添加的新特性,可以让一个匿名函数作为其他表达式的一部分,并且非常合适作为标准函数的谓词。例子中,简单的Lambda函数[]{return !data_queue.empty();}会去检查data_queue是否为空,当data_queue不为空,就说明数据已经准备好了。

wait()会去检查这些条件(通过Lambda函数),当条件满足(Lambda函数返回true)时返回。如果条件不满足(Lambda函数返回false),wait()将解锁互斥量,并且将线程(处理数据的线程)置于阻塞或等待状态。当准备数据的线程调用notify_one()通知条件变量时,处理数据的线程从睡眠中苏醒,重新获取互斥锁,并且再次进行条件检查。在条件满足的情况下,从wait()返回并继续持有锁。当条件不满足时,线程将对互斥量解锁,并重新等待。这就是为什么用std::unique_lock而不使用std::lock_guard的原因——等待中的线程必须在等待期间解锁互斥量,并对互斥量再次上锁,而std::lock_guard没有这么灵活。如果互斥量在线程休眠期间保持锁住状态,准备数据的线程将无法锁住互斥量,也无法添加数据到队列中。同样,等待线程也永远不会知道条件何时满足。

任意的函数和可调用对象都可以传入wait()。当写好函数做为检查条件时,不一定非要放在一个Lambda表达式中,也可以直接将这个函数传入wait()参数。调用wait()的过程中,在互斥量锁定时,可能会去检查条件变量若干次,当提供测试条件的函数返回true就会立即返回。当等待线程重新获取互斥量并检查条件变量时,并非直接响应另一个线程的通知,就是所谓的伪唤醒(spurious wakeup)。因为任何伪唤醒的数量和频率都是不确定的,所以不建议使用有副作用的函数做条件检查。

等待事件或条件(std::condition_variable)_std::condition条件等待-CSDN博客

std::condition_variable用的比较多的情况就是等待一个条件或多个条件成立,譬如在公司的BIM工程中要渲染几千个对象,就必须要多线程并发Draw操作,给cpu的每个线程分配适量的Draw对象数量,尽量让每个线程能几乎同时DrawCmd完成,如果只是主线程单独去Draw,那Draw完至少7分钟,工程大的话至少10分钟以上,而用了多线程后加载完并显示出来就不到几十秒,大大节省了时间,以下是扣的代码框架:


#include "BimObjects.h"

#include "BimMemoryManage.h"

#include "BimObjectManage.h"

#include "BimDrawCmd.h"

#include "BimRender.h"

#include "BimCmdPool.h"

#include "BimDescriptorSets.h"

#include "BimApp.h"

#include "BimBuffers.h"

#include <freetype.h>
#include <iostream>

#include <functional>
#include <condition_variable>
#include <vector>


struct ThreadData
{
    int index;
    int frameID;
};


class RenderThread
{
public:
    typedef std::function<void()> ThreadFunc;

    explicit RenderThread(ThreadFunc func)
        : m_ThreadFunc(func)
        , m_Thread(func)
    {

    }

    ~RenderThread()
    {
        if (m_Thread.joinable())
        {
            m_Thread.join();
        }
    }

    RenderThread(RenderThread const&) = delete;

    RenderThread& operator=(RenderThread const&) = delete;

private:
    std::thread m_Thread;
    ThreadFunc  m_ThreadFunc;
};


std::mutex                  m_FrameStartLock;
std::condition_variable     m_FrameStartCV;

std::mutex                  m_ThreadDoneLock;
std::condition_variable     m_ThreadDoneCV;
int                         m_ThreadDoneCount;

std::vector<ThreadData*>    m_ThreadDatas;
std::vector<RenderThread*>      m_Threads;
bool                        m_ThreadRunning;
int                         m_MainFrameID;

void ThreadRendering(void* param)
{
    ThreadData* threadData = (ThreadData*)param;
    threadData->frameID = 0;

    while (true)
    {
        {
            std::unique_lock<std::mutex> guardLock(m_FrameStartLock);
            if (threadData->frameID == m_MainFrameID)
            {
                printf("threadData->frameID == m_MainFrameID:%d %d idx=%d waiting\n", threadData->frameID, 
                    m_MainFrameID, threadData->index);
                m_FrameStartCV.wait(guardLock);
            }
            else
                printf("threadData->frameID != m_MainFrameID:%d %d idx=%d\n", threadData->frameID, 
                    m_MainFrameID, threadData->index);

        }

        threadData->frameID = m_MainFrameID;

        if (!m_ThreadRunning)
        {
            break;
        }

        printf("thread_frameid:                      %d %d idx=%d\n", threadData->frameID, 
            m_MainFrameID, threadData->index);


        // notify thread done
        {
            std::lock_guard<std::mutex> lockGuard(m_ThreadDoneLock);
            m_ThreadDoneCount += 1;
            m_ThreadDoneCV.notify_one();
        }
    }
}


int main()
{
    int numThreads = 10;

    // thread task
    m_MainFrameID = 0;
    m_ThreadRunning = true;

    m_ThreadDatas.resize(numThreads);
    m_Threads.resize(numThreads);

    for (int i = 0; i < numThreads; ++i)
    {
        // prepare thread data
        m_ThreadDatas[i] = new ThreadData();
        // start thread
        m_ThreadDatas[i]->index = i;
        m_Threads[i] = new RenderThread(
            [=]
            {
                ThreadRendering(m_ThreadDatas[i]);
            }
        );
    }

    while (1) {
        // notify fram start

        {
            std::lock_guard<std::mutex> lockGuard(m_FrameStartLock);
            m_ThreadDoneCount = 0;
            m_MainFrameID += 1;
            m_FrameStartCV.notify_all();
        }

        // wait for thread done
        {
            std::unique_lock<std::mutex> lockGuard(m_ThreadDoneLock);
            while (m_ThreadDoneCount != m_Threads.size())
            {
                m_ThreadDoneCV.wait(lockGuard);
            }
        }
    }
}

代码逻辑就是必须等待所有线程都Draw完成才调用接下来的Present


原文地址:https://blog.csdn.net/zxbcollegestudent/article/details/142431589

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!