自学内容网 自学内容网

Linux - 线程池

线程池

什么是池?

池化技术的核心就是"提前准备并重复利用资源". 减少资源创建和销毁的成本.

那么线程池就是提前准备好一些线程, 当有任务来临时, 就可以直接交给这些线程运行, 当线程完成这些任务后, 并不会被销毁, 而是继续等待任务. 那么这些线程在程序运行过程中只需要创建一次. 相比当任务来临时创建线程, 当线程完成任务后销毁线程这种方法. 省去了很多线程创建销毁的成本.

一次性创建多个线程, 那么就需要将这些线程集中管理起来, 将这先线程集中起来并进行管理, 这样的集合就时线程池.

线程池实现

可以看到在线程池中, 我们至少需要对线程和任务进行管理

线程: 这里我们使用 vector 进行管理

任务: 这里使用 queue 对任务进行管理

首先对于传递给线程的数据先进行一步封装

class ThreadData
{
public:
    ThreadData(const std::string& name, void* arg)
    :_name(name),
    _arg(arg)
    {}

    std::string _name; // 本线程名称
    void* _arg; // 本线程所需(参数)数据
};

然后对线程也进行封装

class Thread
{
public:
    Thread(const int& num, const std::string& name, func_t func, void* arg)
    :_num(num),
    _name(name),
    _func(func),
    _data(name, arg)
    {}

    void start() // 只有调用 start 函数, 线程才会真正被创建
    {
        pthread_create(&_t, NULL, _func, (void*)&_data);
    }

    void join()
    {
        pthread_join(_t, NULL);
    }

private:
    int _num; // 第几号线程
    pthread_t _t; // 线程的 pthread_t
    std::string _name; // 线程名称
    func_t _func; // 线程要执行的函数
    ThreadData _data; // 线程所需要的参数(数据)
};

最后再是线程池 threadpool 代码.

#include<iostream>
#include<pthread.h>
#include<vector>
#include<queue>
#include<functional>
#include<unistd.h>

// 对于线程要执行的函数的重命名
typedef void*(*func_t)(void*);

class ThreadData
{
public:
    ThreadData(const std::string& name, void* arg)
    :_name(name),
    _arg(arg)
    {}

    std::string _name; // 本线程名称
    void* _arg; // 本线程所需数据
};

class Thread
{
public:
    Thread(const int& num, const std::string& name, func_t func, void* arg)
    :_num(num),
    _name(name),
    _func(func),
    _data(name, arg)
    {}

    void start()
    {
        pthread_create(&_t, NULL, _func, (void*)&_data);
    }

    void join()
    {
        pthread_join(_t, NULL);
    }

private:
    int _num; // 第几号线程
    pthread_t _t; // 线程的 pthread_t
    std::string _name; // 线程名称
    func_t _func; // 线程要执行的函数
    ThreadData _data; // 线程所需要的参数(数据)
};

template<class T>
class threadpool
{
public:
    threadpool(int num = 5)
    {
        for(int i = 0; i < num; ++i)
        {
            std::string name = "线程" + std::to_string(i);
            _threads.push_back(new Thread(i, name, route, (void*)this));
        }
    }

    void run() // 让线程池中的线程都被创建出来, 并运行起来
    {
        for(int i = 0; i < _threads.size(); ++i)
        {
            _threads[i]->start();
        }
    }

    // 将数据放入线程池中
    void push(const T& data)
    {
        pthread_mutex_lock(&_mtx);
        _tasks.push(data);
        pthread_mutex_unlock(&_mtx);
        pthread_cond_signal(&_cv);
    }

    // 析构函数
    ~threadpool()
    {
        for(int i = 0; i < _threads.size(); ++i)
        {
            _threads[i]->join();
        }
    }

private:
    // 线程所要执行的函数
    // pthread_create 函数中, 传递给线程的函数类型就是 void*(*)(void*)
    // 如果这里不使用 static 静态函数, 而使用成员函数
    // 成员函数中默认会添加一个参数(this指针), 这样的化成员函数的类型就和 pthread_create 函数要求的类型不匹配
    static void* route(void* arg)
    {
        // 从 Thread 的 start 函数中可以看到, 我们传递给函数的参数类型是 ThreadData*
        // 而 ThreadData 中的 _arg的实际类型, 通过 Thread 的构造函数和 run 函数可以看到
        // 给 ThreadData 的第二个参数实际上就 this 指针, this == threadpool<T>*
        // 所以 _arg 就是这个线程池对象的指针. 那么通过 _arg 也就能访问到
        // 线程池中的 _tasks. 
        ThreadData* td = (ThreadData*)arg;
        threadpool<T>* tp = (threadpool<T>*)td->_arg;
        T task;
        while(true)
        {
            // 获取数据
            pthread_mutex_lock(&tp->_mtx);
            while(tp->_tasks.empty() == true) // 如果没有数据了, 那么就进行等待
            {
                pthread_cond_wait(&tp->_cv, &tp->_mtx);
            }
            task = tp->_tasks.front();
            tp->_tasks.pop();
            pthread_mutex_unlock(&tp->_mtx);

            // 处理数据
            // 这里直接打印, 便于观察
            std::cout << td->_name << ": "<< task << std::endl;
        }
        return NULL;
    }

private:
    std::vector<Thread*> _threads; // 管理所有的线程
    std::queue<T> _tasks; // 管理所有的任务
    pthread_mutex_t _mtx; // 线程间互斥的访问 _tasks
    pthread_cond_t _cv; // 当 _tasks 中没有任务时, 让所有的线程进行等待, 直到被唤醒
};

int main()
{
    threadpool<int> tp;
    tp.run();
    for(int i = 0; i < 10000; ++i)
    {
        tp.push(i);
    }
    sleep(5);
    exit(0);
    return 0;
}

原文地址:https://blog.csdn.net/m0_74506452/article/details/145289985

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!