自学内容网 自学内容网

线程池实现

线程池

本文介绍线程池的作用、线程池的应用场景、线程池的工作原理、代码实现线程池以及与nginx的线程池对比分析。

错误理解:要使用线程就从线程池里面拿一个线程出来使用,用完再返回给线程池。这种理解是连接池的概念。而线程池是多个线程去任务队列取任务,竞争任务。

线程池的作用

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池的主要作用包括:

  1. 减少线程创建和销毁的开销:线程的创建和销毁需要消耗系统资源,线程池通过复用线程来减少这部分开销。
  2. 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
  3. 控制并发数量:避免大量线程之间因互相抢占系统资源而导致的阻塞现象。
  4. 提供更多功能:如定时执行、定期执行、线程中断等。
线程池的工作原理

线程池的工作原理主要包括以下几个步骤:

  1. 创建线程池:预先创建一定数量的线程。
  2. 提交任务:将任务提交到任务队列中。
  3. 分配任务:线程池中的线程从任务队列中获取任务并执行。
  4. 任务完成:线程执行完任务后返回线程池等待下一个任务。
  5. 线程复用:线程不会被销毁,而是继续执行新的任务
while(1){
    get_task();
    task->func();
}
代码实现线程池
#ifndef __THREAD_m_poolH__
#define __THREAD_m_poolH__

#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <functional>

class ThreadPool {
public:
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    static ThreadPool& instance() {
        static ThreadPool ins;
        return ins;
    }

    using Task = std::packaged_task<void()>;


    ~ThreadPool() {
        stop();
    }

    template <class F, class... Args>
    auto commit(F&& f, Args&&... args) -> 
    //std::future<decltype(f(args)...)>
        std::future<decltype(std::forward<F>(f)(std::forward<Args>(args)...))> {
        using RetType = decltype(std::forward<F>(f)(std::forward<Args>(args)...));
        if (m_stop.load())
            return std::future<RetType>{};

        auto task = std::make_shared<std::packaged_task<RetType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));

        std::future<RetType> ret = task->get_future();
        {
            std::lock_guard<std::mutex> cv_mt(m_mutex);
            m_task.emplace([task] { (*task)(); });
        }
        m_cond.notify_one();
        return ret;
    }

    int idleThreadCount() {
        return m_threadPoolSize;
    }
    std::atomic_int getSize() {
        return m_threadPoolSize.load();
    }
private:
    ThreadPool(unsigned int num = std::thread::hardware_concurrency())
        : m_stop(false) {
            {
                if (num == 2)
                    m_threadPoolSize = 1;
                else
                    m_threadPoolSize = num;
            }
            start();
    }
    void start() {
        for (int i = 0; i < m_threadPoolSize; ++i) {
             m_pool.emplace_back([this]() {
                // load() 原子获取,store() 原子更新
                while (!this->m_stop.load()) {
                    Task task;
                    {
                        std::unique_lock<std::mutex> cv_mt(m_mutex);
                        this->m_cond.wait(cv_mt, [this] {
                            // this->
                            return this->m_stop.load() || !this->m_task.empty();
                            });
                        if (this->m_task.empty())
                            return;

                        task = std::move(this->m_task.front());
                        this->m_task.pop();
                    }
                    this->m_threadPoolSize--;
                    task();
                    this->m_threadPoolSize++;
                }
                });
        }
    }
    void stop() {
        m_stop.store(true);
        m_cond.notify_all();
        for (auto& td : m_pool) {
            if (td.joinable()) {
                std::cout << "join thread " << td.get_id() << std::endl;
                td.join();
            }
        }
    }


private:
    std::mutex               m_mutex;
    std::condition_variable  m_cond;
    std::atomic_bool         m_stop;
    std::atomic_int          m_threadPoolSize;
    std::queue<Task>         m_task;
    std::vector<std::thread> m_pool;
};

#endif  // !__THREAD_m_poolH__

原文地址:https://blog.csdn.net/weixin_51332735/article/details/142780848

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!