自学内容网 自学内容网

缓冲式线程池C++简易实现

前言 :

代码也比较短,简单说一下代码结构,是这样的:


SyncQueue.hpp封装了一个大小为MaxTaskCount的同步队列,这是一个模板类,它在线程池中承担了存放任务等待线程组中的线程来执行的角色。最底层是std::list<T>但是我们起名为m_queue,因为我们使用list模拟了队列,这里使用std::vector是不合适的,我们要频繁插入(添任务)删除(取任务),开销太大,具体了解一下std::vector底层。

我们还加了一个m_mtx互斥锁,因为我们要保证对于队列的访问是线程安全的,但是因为使用unique_lock对于m_mtx要修改,所以我们加了mutable关键字(mutable保证我们可以在常方法中修改类的非静态成员)。

        std::condition_variable m_notEmpty;
        std::condition_variable m_notFull;

他俩是条件变量是用来同步队列任务是否为空为满的。

Add()是添加任务调用的底层函数,我们对它做了一个封装,分别适合左值和右值:

        int Put(const T& task)
        {
            return Add(task);
        }
        int Put(T&& task)
        {
            return Add(std::forward<T>(task));
        } 

 同理Take也是两个。

WaitQueueEmptyStop()是后期添加的一个函数,比如在我们添加任务结束后,线程来执行任务,但是此时主线程准备结束,我们调用析构线程池对象的析构函数,它最终会调用这个函数判断任务队列中是否还有任务,如果不空,那么我就弃锁睡眠1秒,循环往复,直到队列为空。


 CachedThreadPool.hpp是线程池的代码,Task是可调用对象的包装器,上述任务队列中放的就是Task,我们添加的也是Task,执行的也是Task。底层封装了上述的任务队列。

我们使用一个

std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;

来存储某个thread的id 和管理它的共享性智能指针,方便我们到了KeepAliveTimes秒删除它,这是也是我们起名为CachedThreadPool的原因,它是一个缓冲型线程池,线程数量是浮动的,受制于两个原子变量的限制,让线程数不至于太少无法执行任务,不至于太多而空闲:

        std::atomic<int> m_idleThreadSize;
        std::atomic<int> m_curThreadSize;

构造函数中我们Start()开两个线程,线程的入口函数是CachedThreadPool::RunInThread(),让他们去检测是否有任务,有就task();反之陷入m_queue.Take(task)。 

析构函数中我们调用StopThreadGroup(),它会先调用任务队列的WaitQueueEmptyStop()确保任务队列为空,然后使用一个range-for结束掉线程组:
            for (auto& x : m_threadgroup)
            {
                if (x.second->joinable())
                {
                    x.second->join();
                }
            }

还有一个重要的成员函数:

        template <class Func, class...  Args>
        void AddTask(Func&& func, Args&&... args)
        {
            auto task = std::make_shared<std::function<void()> >(
                std::bind(std::forward<Func>(func),
                    std::forward<Args>(args)...));

            if (m_queue.Put([task]() { (*task)(); }) != 0)
            {
                cerr << "not add task queue... " << endl;
                (*task)();
            }
            AddnewThread();
        }

这是一个模板函数,用来添加没有返回值的任务到任务队列中,使用了引用性别未定义,可变模板参数,bind,完美转发,lambda表达式,智能指针,这个函数的成型颇具困难。

        template <class Func, class... Args>
        auto submit(Func&& func, Args &&...args)
        {
            using RetType = decltype(func(args...));
            auto task = std::make_shared<std::packaged_task<RetType()>>(
                std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
            std::future<RetType> result = task->get_future();

            if (m_queue.Put([task]()
                { (*task)(); }) != 0)
            {
                cerr << "not add task queue... " << endl;
                (*task)();
            }
            AddnewThread();
            return result;
        }

 这是一个模板函数,用来添加带有返回值的任务到任务队列。


main.cpp中有一个有意思的问题,可以探讨一下:

std::shared_ptr<FILE> pf(fopen("test.txt", "w"));

//std::shared_ptr<FILE> pf(fopen("test.txt", "w"),fileDeleter());

main.cpp是我们的测试文件,我们想把测试结果打印到文件中,方便观察,我们刚开始使用了一个裸的FILE* fp;然后在main函数结束之后fclose(fp); fp = nullptr;

结果我们跑不通,最终发现是线程池对象析构之前的这两行代码会被执行,文件被关闭,我们无法将结果写入文件中。

最后我们使用了shared_ptr<FILE>来管理文件,但是写成了第一行,我们发现往屏幕上写结果正确但是写文件不行,而且往屏幕上写总是在线程池对象完美析构,一切安顿好之后出现异常,这令人困惑,我以为是我们线程的释放还做得有问题,谁曾想是这个指针坏事了,pf对象作为一个全局对象,析构于局部的线程池对象之后,此时它会调用它的默认删除器,也就是delete,这显然是不行的,文件指针应该fclose才对,所以我们写了一个自定义的删除器来改正错误。

struct fileDeleter {
    void operator()(FILE* fp)const {
        if (fp) {
            cout << "file close\n";
            fclose(fp);
        }
    }
}; 

类似错误还有很多,比如之前的:

红框中我们错写为了:m_queue.pop_back();

这个错误虽然看似简单,实则复杂,因为打印出来结果很流畅,但是值不对,后来人肉debug发现这个错误后令我哭笑不得,好家伙,每次拿走一个任务,竟然pop末尾的未被执行的任务..........

多线程代码调试我还不擅长,任重而道远,但我喜欢De这些有趣的bug! 


 代码总体逻辑类似于生产者-消费者模型,这也是OS学习的经典中的经典,线程池基本就是从这里扩展而来,又分化为了各种类别的线程池,定长的,缓冲的,窃取的.................

源代码:

main.cpp 

#define _CRT_SECURE_NO_WARNINGS

// C++ STL
#include <thread>
#include <iostream>
using namespace std;
#include"CachedThreadPool.hpp"
struct fileDeleter {
void operator()(FILE* fp)const {
if (fp) {
cout << "file close\n";
fclose(fp);
}
}
};
class Int
{
private:
int value;
public:
Int(int x = 0) : value(x)
{
cout << "create Int " << value << endl;
}
~Int()
{
cout << "destroy Int: " << value << endl;
}
Int(const Int& it) :value(it.value)
{
cout << "Copy Int " << value << endl;
}
Int(Int&& it) :value(it.value)
{
cout << "Move create Int : " << value << endl;
it.value = -1;
}
Int& operator=(const Int& it)
{
if (this != &it)
{
value = it.value;
}
cout << "operator=" << endl;
return *this;
}
Int& operator=(Int&& it)
{
if (this != &it)
{
value = it.value;
it.value = -1;
}
cout << "operator=(Int &&)" << endl;
return *this;
}
Int& operator++()
{
value += 1;
return *this;
}
Int operator++(int)
{
Int tmp = *this;
++* this;
return tmp;
}
bool operator<(const int x) const
{
return this->value < x;
}
ostream& operator<<(ostream& out) const {
return out << value;
}
};
ostream& operator<<(ostream& out, const Int& it)
{
return it << out;
}

std::mutex mtx;
std::shared_ptr<FILE> pf(fopen("test.txt", "w"),fileDeleter());
void print(Int x)
{

std::lock_guard<std::mutex> lock(mtx);
//cout << "print x: " << &x << " " << x << endl;
fprintf(pf.get(), "print x : %d => &x: %p \n", x, &x);
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
}
void funa(myspace::CachedThreadPool& mypool)
{
double dx = 1;
for (Int i = 0; i < 100; ++i)
{
mypool.AddTask(print, i);
}
cout << "funa ... end " << endl;
}

int main()
{
myspace::CachedThreadPool mypool;
std::thread tha(funa, std::ref(mypool));
tha.join();
return 0;
}

SyncQueue.hpp


// C++ STL
#include <list>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <iostream>
using namespace std;

#ifndef SYNC_QUEUE_HPP
#define SYNC_QUEUE_HPP
static const int MaxTaskCount = 2000;
namespace myspace
{
template <class T>
class SyncQueue
{
private:
std::list<T> m_queue;
mutable std::mutex m_mtx;
std::condition_variable m_notEmpty;
std::condition_variable m_notFull;
std::condition_variable m_waitStop;
int m_maxSize;
int m_waitTime;
bool m_needStop;

bool IsFull() const
{
bool full = m_queue.size() >= m_maxSize;
if (full)
{
cerr << "m_queue full .. wait .. " << endl;
}
return full;
}
bool IsEmpty() const
{
bool empty = m_queue.empty();
if (empty)
{
cerr << "m_queue empty...  wait ... " << endl;
}
return empty;
}

template <class F>
int Add(F&& task)
{
std::unique_lock<std::mutex> lock(m_mtx);
if (!m_notFull.wait_for(lock, std::chrono::seconds(m_waitTime),
[this]()
{ return m_needStop || !IsFull(); }))
{
return 1; // 任务队列已达上限
}

if (m_needStop)
{
return 2; // 任务队列stop;
}
m_queue.push_back(std::forward<F>(task));
m_notEmpty.notify_all();
return 0;
}

public:
SyncQueue(int maxsize = MaxTaskCount, int timeout = 1)
: m_maxSize(maxsize), m_needStop(false), m_waitTime(timeout)
{
}
~SyncQueue()
{
if (!m_needStop)
{
Stop();
}
}
int Put(const T& task)
{
return Add(task);
}
int Put(T&& task)
{
return Add(std::forward<T>(task));
}

int Take(std::list<T>& tlist)
{
std::unique_lock<std::mutex> lock(m_mtx);
if (!m_notEmpty.wait_for(lock, std::chrono::seconds(m_waitTime),
[this]() {  return m_needStop || !IsEmpty(); }))
{
return 1; // timeout;
}
if (m_needStop)
{
return 2;
}
tlist = std::move(m_queue);
m_notFull.notify_all();
return 0;
}
int Take(T& task)
{
std::unique_lock<std::mutex> lock(m_mtx);
if (!m_notEmpty.wait_for(lock, std::chrono::seconds(m_waitTime),
[this]() {  return m_needStop || !IsEmpty(); }))
{
return 1; // timeout;
}
if (m_needStop)
{
return 2;
}
task = m_queue.front();
m_queue.pop_front();
m_notFull.notify_all();
return 0;
}
void WaitQueueEmptyStop()
{
std::unique_lock<std::mutex> locker(m_mtx);
while (!IsEmpty())
{
m_waitStop.wait_for(locker, std::chrono::seconds(1));
}
m_needStop = true;
m_notFull.notify_all();
m_notEmpty.notify_all();
}
void Stop()
{
{
std::unique_lock<std::mutex> lock(m_mtx);
m_needStop = true;
}
m_notEmpty.notify_all();
m_notFull.notify_all();
}

bool Empty() const
{
std::unique_lock<std::mutex> lock(m_mtx);
return m_queue.empty();
}
bool Full() const
{
std::unique_lock<std::mutex> lock(m_mtx);
return m_queue.size() >= m_maxSize;
}

size_t Size() const
{
std::unique_lock<std::mutex> lock(m_mtx);
return m_queue.size();
}
size_t Count() const
{
return m_queue.size();
}
};
}
#endif

CachedThreadPool.hpp

//C API
#include<time.h>
//OWN
#include "SyncQueue.hpp"
// C++ STL
#include <functional>
#include <unordered_map>
#include <map>
#include <future>
#include <atomic>
#include <memory>
#include <thread>
using namespace std;
#ifndef CACHED_THREAD_POOL_HPP
#define CACHED_THREAD_POOL_HPP

namespace myspace
{
static const int InitThreadNums = 2;
static const time_t KeepAliveTimes = 5;
class CachedThreadPool
{
public:
using Task = std::function<void(void)>;
private:
std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;
int m_coreThreadSize;
int m_maxThreadSize;
std::atomic<int> m_idleThreadSize;
std::atomic<int> m_curThreadSize;
std::mutex m_mutex;
myspace::SyncQueue<Task> m_queue;
std::atomic<bool> m_running;
std::once_flag m_flag;
void RunInThread()
{
auto tid = std::this_thread::get_id();
time_t startTime = time(nullptr);
while (m_running)
{
Task task;
if (m_queue.Size() == 0)
{
time_t  curnow = time(nullptr);
time_t intervalTime = curnow - startTime;
if (intervalTime >= KeepAliveTimes && m_curThreadSize > m_coreThreadSize)
{
m_threadgroup.find(tid)->second->detach();
m_threadgroup.erase(tid);
m_curThreadSize -= 1;
m_idleThreadSize -= 1;
cerr << "delete idle thread tid: " << tid << endl;
cerr << "idle thread num: " << m_idleThreadSize << endl;
cerr << "cur thread num: " << m_curThreadSize << endl;
return;
}
}
if (!m_queue.Take(task) && m_running)
{
m_idleThreadSize -= 1;
task();
m_idleThreadSize += 1;
startTime = time(nullptr);
}
}
}
void Start(int numthreas)
{
m_running = true;
m_curThreadSize = numthreas;
for (int i = 0; i < numthreas; ++i)
{
auto tha = std::make_shared<std::thread>(&CachedThreadPool::RunInThread, this);
std::thread::id tid = tha->get_id();
m_threadgroup.emplace(tid, std::move(tha));
m_idleThreadSize += 1;
}
}
void StopThreadGroup()
{
m_queue.WaitQueueEmptyStop();
m_running = false;
for (auto& x : m_threadgroup)
{
if (x.second->joinable())
{
x.second->join();
}
}
m_threadgroup.clear();
}
void AddnewThread()
{
if (m_idleThreadSize <= 0 && m_curThreadSize < m_maxThreadSize)
{
auto tha = std::make_shared<std::thread>(&CachedThreadPool::RunInThread, this);
std::thread::id tid = tha->get_id();
m_threadgroup.emplace(tid, std::move(tha));
m_idleThreadSize += 1;
m_curThreadSize += 1;
cerr << "AddnewThread id: " << tid << endl;
cerr << "m_curThreadSize: " << m_curThreadSize << endl;
}
}

public:
CachedThreadPool(int initNumThreads = InitThreadNums, int taskQueueSize = MaxTaskCount)
: m_coreThreadSize(initNumThreads),
m_maxThreadSize(std::thread::hardware_concurrency() + 1),
m_idleThreadSize(0),
m_curThreadSize(0),
m_queue(taskQueueSize),
m_running(false)
{
Start(m_coreThreadSize);
}
~CachedThreadPool()
{
if (m_running)
{
Stop();
}
}
void Stop()
{
std::call_once(m_flag, [this]()
{ StopThreadGroup(); });
}
template <class Func, class...  Args>
void AddTask(Func&& func, Args&&... args)
{
auto task = std::make_shared<std::function<void()> >(
std::bind(std::forward<Func>(func),
std::forward<Args>(args)...));

if (m_queue.Put([task]() { (*task)(); }) != 0)
{
cerr << "not add task queue... " << endl;
(*task)();
}
AddnewThread();
}

template <class Func, class... Args>
auto submit(Func&& func, Args &&...args)
{
using RetType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<RetType> result = task->get_future();

if (m_queue.Put([task]()
{ (*task)(); }) != 0)
{
cerr << "not add task queue... " << endl;
(*task)();
}
AddnewThread();
return result;
}
};
}

#endif


原文地址:https://blog.csdn.net/kitesxian/article/details/143772616

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!