线程池实现
线程池
本文介绍线程池的作用、线程池的应用场景、线程池的工作原理、代码实现线程池以及与nginx的线程池对比分析。
错误理解:要使用线程就从线程池里面拿一个线程出来使用,用完再返回给线程池。这种理解是连接池的概念。而线程池是多个线程去任务队列取任务,竞争任务。
线程池的作用
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池的主要作用包括:
- 减少线程创建和销毁的开销:线程的创建和销毁需要消耗系统资源,线程池通过复用线程来减少这部分开销。
- 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
- 控制并发数量:避免大量线程之间因互相抢占系统资源而导致的阻塞现象。
- 提供更多功能:如定时执行、定期执行、线程中断等。
线程池的工作原理
线程池的工作原理主要包括以下几个步骤:
- 创建线程池:预先创建一定数量的线程。
- 提交任务:将任务提交到任务队列中。
- 分配任务:线程池中的线程从任务队列中获取任务并执行。
- 任务完成:线程执行完任务后返回线程池等待下一个任务。
- 线程复用:线程不会被销毁,而是继续执行新的任务
while(1){
get_task();
task->func();
}
代码实现线程池
#ifndef __THREAD_m_poolH__
#define __THREAD_m_poolH__
#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <functional>
class ThreadPool {
public:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
static ThreadPool& instance() {
static ThreadPool ins;
return ins;
}
using Task = std::packaged_task<void()>;
~ThreadPool() {
stop();
}
template <class F, class... Args>
auto commit(F&& f, Args&&... args) ->
//std::future<decltype(f(args)...)>
std::future<decltype(std::forward<F>(f)(std::forward<Args>(args)...))> {
using RetType = decltype(std::forward<F>(f)(std::forward<Args>(args)...));
if (m_stop.load())
return std::future<RetType>{};
auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<RetType> ret = task->get_future();
{
std::lock_guard<std::mutex> cv_mt(m_mutex);
m_task.emplace([task] { (*task)(); });
}
m_cond.notify_one();
return ret;
}
int idleThreadCount() {
return m_threadPoolSize;
}
std::atomic_int getSize() {
return m_threadPoolSize.load();
}
private:
ThreadPool(unsigned int num = std::thread::hardware_concurrency())
: m_stop(false) {
{
if (num == 2)
m_threadPoolSize = 1;
else
m_threadPoolSize = num;
}
start();
}
void start() {
for (int i = 0; i < m_threadPoolSize; ++i) {
m_pool.emplace_back([this]() {
// load() 原子获取,store() 原子更新
while (!this->m_stop.load()) {
Task task;
{
std::unique_lock<std::mutex> cv_mt(m_mutex);
this->m_cond.wait(cv_mt, [this] {
// this->
return this->m_stop.load() || !this->m_task.empty();
});
if (this->m_task.empty())
return;
task = std::move(this->m_task.front());
this->m_task.pop();
}
this->m_threadPoolSize--;
task();
this->m_threadPoolSize++;
}
});
}
}
void stop() {
m_stop.store(true);
m_cond.notify_all();
for (auto& td : m_pool) {
if (td.joinable()) {
std::cout << "join thread " << td.get_id() << std::endl;
td.join();
}
}
}
private:
std::mutex m_mutex;
std::condition_variable m_cond;
std::atomic_bool m_stop;
std::atomic_int m_threadPoolSize;
std::queue<Task> m_task;
std::vector<std::thread> m_pool;
};
#endif // !__THREAD_m_poolH__
原文地址:https://blog.csdn.net/weixin_51332735/article/details/142780848
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!