使用C++11相关特性实现一个线程池
前言
PS:本文来自于腾讯课堂零声学院上课课件
关于线程池的基本概念可以参考Linux C实现线程池,提到了如何实现线程池,如果使用C++11来实现一个线程池,应该如何实现呢,在实现线程池之前,先熟悉一下涉及到相关知识点,由于线程池用了大量的C++11特性,如果对C++11比较熟悉,可以直接看线程池代码,否则内容太多,会容易看着看着就放弃了。
可变模板参数
C++11的新特性–可变模版参数(variadic templates)是C++11新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
模板参数的展开
模板参数的语法如下
template <class ... T>
void f(T ...args);
上面的args参数前面有省略号,是可变模板参数,我们叫这样的参数为“参数包”,它可以包含0到N个模板参数,无法直接获取参数包args中的每个参数,只能通过展开参数包的方式来获取参数包中的每个参数。可以使用如下方式获取参数的个数
sizeof ...(args);
如果要获取每个参数,可以通过递归展开包的方式,如下所示
#include <iostream>
using namespace std;
//递归终止函数
void print()
{
cout << "empty" << endl;
}
//展开函数
template <class T, class ...Args>
void print(T head, Args... rest)
{
cout << "parameter " << head << endl;
print(rest...);
}
int main(void)
{
print(1,2,3,4);
return 0;
}
thread
C++11创建多线程非常简单,其初始化构造函数如下
//创建std::thread执行对象,该thread对象可被joinable,新产生的线程会调用threadFun函数,该函数的参数由 args 给出
template<class Fn, class... Args>
explicit thread(Fn&& fn, Args&&... args);
并且thread对象不能被拷贝(拷贝构造是delete的)。主要的成员函数如下
get_id():获取线程id,返回类型为std::thread::id对象
joinable():判断线程是否可以加入等待
join():等待线程执行完毕后返回
detach():将本线程从调用线程中分离出来,允许本线程独立执行(但是当主线程结束的时候,子线程也会被强制终止)
测试代码如下
void func1()
{
cout << "func1 into" << endl;
}
int main()
{
std::thread t1(func1); // 只传递函数
t1.join(); // 阻塞等待线程函数执行结束
}
构造函数的函数对象可以是普通的函数,也可以是类的静态成员函数,只要是函数类型即可。
我们可以使用提供的std::thread封装我们自己的线程类对象。
互斥量
C++11提供4种语义的互斥量(mutex)
- std::mutex 独占的互斥量,不能递归使用
- std::time_mutex,带超时的独占互斥量,不能递归使用
- std::recursive_mutex,递归互斥量,不带超时功能
- std::recursive_timed_mutex,带超时的递归互斥量
std::mutex
std::mutex 是C++11 中最基本的互斥量,std::mutex 对象提供了独占所有权的特性——即不支持递归地对 std::mutex 对象上锁,而 std::recursive_lock 则可以递归地对互斥量对象上锁。
成员函数如下
- 构造函数,std::mutex不允许拷贝构造,也不允许 move 拷贝,最初产生的 mutex 对象是处于unlocked 状态的。
- lock(),调用线程将锁住该互斥量。线程调用该函数会发生下面 3 种情况:(1). 如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock之前,该线程一直拥有该锁。(2). 如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。(3). 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。使用std::recursive_mutex课可以解决这个问题。
- unlock(), 解锁,释放对互斥量的所有权。
- try_lock(),尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程也不会被阻塞。线程调用该函数也会出现下面 3 种情况,(1). 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。(2). 如果当前互斥量被其他线程锁住,则当前调用线程返回false,而并不会被阻塞掉。(3). 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。
下面演示使用mutex死锁的代码范例
#include <iostream>
#include <thread>
#include <mutex>
struct Complex
{
std::mutex mutex;
int i;
Complex() : i(0){}
void mul(int x)
{
std::lock_guard<std::mutex> lock(mutex);
i *= x;
}
void div(int x)
{
std::lock_guard<std::mutex> lock(mutex);
i /= x;
}
void both(int x, int y)
{
std::lock_guard<std::mutex> lock(mutex);
mul(x);
div(y);
}
};
int main(void)
{
Complex complex;
complex.both(32, 23);
return 0;
}
上述代码,不会终止,会一直卡着,将std::mutex换成std::recursive_mutex即可解决问题。
建议:不建议使用std::recursive_mutex,如果你考虑使用了std::recursive_mutex,一般都是代码的设计有问题。主要原因如下:
- 需要用到递归锁的多线程互斥处理本身就是可以简化的,允许递归很容易放纵复杂逻辑的产生,并且产生晦涩,当要使用递归锁的时候应该重新审视自己的代码是否一定要使用递归锁;
- 递归锁比起非递归锁,效率会低;
- 递归锁虽然允许同一个线程多次获得同一个互斥量,但可重复获得的最大次数并未具体说明,一旦超过一定的次数,再对lock进行调用就会抛出std::system错误。
std::timed_mutex
std::timed_mutex比std::mutex多了两个超时获取锁的接口:try_lock_for和try_lock_until,前者提供一个超时时间,如果超时时间还没获取到锁,就直接返回false。不在获取锁。
lock_guard和unique_lock
在前面使用mutex的时候,需要手动使用lock和unlock,这可能会存在以下问题:
1.程序代码有多个返回的地方,如果在其中一处没有调用unlock,会导致程序其他地方无法获取到锁。
2.如果程序在某个地方抛出异常,那么也可能不会去unlock锁。一样会造成程序其他地方无法获取锁
相对于手动lock和unlock,我们可以使用RAII(通过类的构造析构)来实现更好的编码方式。这里涉及到unique_lock,lock_guard的使用。
下面显示了使用unique_lock的例子
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock_guard
#include <stdexcept> // std::logic_error
std::mutex mtx;
void print_even (int x) {
if (x%2==0) std::cout << x << " is even\n";
else throw (std::logic_error("not even"));
}
void print_thread_id (int id) {
try {
// using a local lock_guard to lock mtx guarantees unlocking on
destruction / exception:
std::lock_guard<std::mutex> lck (mtx);
print_even(id);
}
catch (std::logic_error&) {
std::cout << "[exception caught]\n";
}
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);
for (auto& th : threads) th.join();
return 0;
}
以上代码如果我们传递给print_thread_id的数为奇数,就会抛出异常,但是使用std::lock_guard 也会自动调用unlock,不会造成死锁问题。
这里的lock_guard换成unique_lock是一样的。但是unique_lock具有以下好处:
- unique_lock与lock_guard都能实现自动加锁和解锁,但是前者更加灵活,能实现更多的功能。
- unique_lock可以进行临时解锁和再上锁,如在构造对象之后使用lck.unlock()就可以进行解锁,lck.lock()进行上锁,而不必等到析构时自动解锁。
但是unique_lock比lock_guard需要付出更多的时间和性能成本。
使用以下代码模拟一个生产消费者模式
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
void push() {
while (true) {
std::unique_lock<std::mutex> locker(mu);
q.push_front(count);
locker.unlock();
cond.notify_one();
sleep(10);
}
}
void pop() {
while (true) {
std::unique_lock<std::mutex> locker(mu);
cond.wait(locker, [](){return !q.empty();});
data = q.back();
q.pop_back();
locker.unlock();
std::cout << "thread2 get value form thread1: " << data << std::endl;
}
}
int main() {
std::thread t1(push);
std::thread t2(pop);
t1.join();
t2.join();
return 0;
}
上述代码使用条件变量,条件变量的目的就是为了,在没有获得某种提醒时长时间休眠; 如果正常情况下, 我们需要一直循环(+sleep), 这样的问题就是CPU消耗+时延问题,条件变量的意思是在cond.wait这里一直休眠直到cond.notify_one()唤醒才开始执行下一句; 还有cond.notify_all()接口用于唤醒所有等待的线程。
条件变量
互斥量是多线程间同时访问某一共享变量时,保证变量可被安全访问的手段。但单靠互斥量无法实现线程的同步。线程同步是指线程间需要按照预定的先后次序顺序进行的行为。
条件变量使用过程:
1. 拥有条件变量的线程获取互斥量;
2. 循环检查某个条件,如果条件不满足则阻塞直到条件满足;如果条件满足则向下执行;
3. 某个线程满足条件执行完之后调用notify_one或notify_all唤醒一个或者所有等待线程。
条件变量提供了两类操作:wait和notify。这两类操作构成了多线程同步的基础。
成员函数
wait函数
void wait (unique_lock<mutex>& lck);
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);
包含两种重载,第一种只包含unique_lock对象,另外一个Predicate 对象(等待条件),这里必须使用unique_lock,因为wait函数的工作原理:
- 当前线程调用wait()后将被阻塞并且函数会解锁互斥量,直到另外某个线程调用notify_one或者notify_all唤醒当前线程;一旦当前线程获得通知(notify),wait()函数也是自动调用lock(),同理不能使用lock_guard对象。
- 如果wait没有第二个参数,第一次调用默认条件不成立,直接解锁互斥量并阻塞到本行,直到某一个线程调用notify_one或notify_all为止,被唤醒后,wait重新尝试获取互斥量,如果得不到,线程会卡在这里,直到获取到互斥量,然后无条件地继续进行后面的操作。
- 如果wait包含第二个参数,如果第二个参数不满足,那么wait将解锁互斥量并堵塞到本行,直到某一个线程调用notify_one或notify_all为止,被唤醒后,wait重新尝试获取互斥量,如果得不到,线程会卡在这里,直到获取到互斥量,然后继续判断第二个参数,如果表达式为false,wait对互斥量解锁,然后休眠,如果为true,则进行后面的操作。
wait_for函数
template <class Rep, class Period>
bool wait_for (unique_lock<mutex>& lck,
const chrono::duration<Rep,Period>& rel_time);
template <class Rep, class Period, class Predicate>
bool wait_for (unique_lock<mutex>& lck,
const chrono::duration<Rep,Period>& rel_time, Predicate
pred);
和wait不同的是,wait_for可以执行一个时间段,在线程收到唤醒通知或者时间超时之前,该线程都会处于阻塞状态,如果收到唤醒通知或者时间超时,wait_for返回,剩下操作和wait类似。
wait_until函数
template <class Clock, class Duration>
bool wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time);
template <class Clock, class Duration, class Predicate>
bool wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time,Predicate pred);
与wait_for类似,只是wait_until可以指定一个时间点,在当前线程收到通知或者指定的时间点超时之前,该线程都会处于阻塞状态。如果超时或者收到唤醒通知,wait_until返回,剩下操作和wait类似
notify_one函数
void notify_one() noexcept
解锁正在等待当前条件的线程中的一个,如果没有线程在等待,则函数不执行任何操作,如果正在等待的线程多余一个,则唤醒的线程是不确定的。
notify_all函数
void notify_all() noexcept;
解锁正在等待当前条件的所有线程,如果没有正在等待的线程,则函数不执行任何操作。
下面使用条件变量实现一个生产消费者模式
#include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
template<typename T>
class SyncQueue
{
private:
bool IsFull() const
{
return _queue.size() == _maxSize;
}
bool IsEmpty() const
{
return _queue.empty();
}
public:
SyncQueue(int maxSize) : _maxSize(maxSize)
{
}
void Put(const T& x)
{
std::lock_guard<std::mutex> locker(_mutex);
while (IsFull())
{
std::cout << "full wait..." << std::endl;
_notFull.wait(_mutex);
}
_queue.push_back(x);
_notEmpty.notify_one();
}
void Take(T& x)
{
std::lock_guard<std::mutex> locker(_mutex);
while (IsEmpty())
{
std::cout << "empty wait.." << std::endl;
_notEmpty.wait(_mutex);
}
x = _queue.front();
_queue.pop_front();
_notFull.notify_one();
}
bool Empty()
{
std::lock_guard<std::mutex> locker(_mutex);
return _queue.empty();
}
bool Full()
{
std::lock_guard<std::mutex> locker(_mutex);
return _queue.size() == _maxSize;
}
size_t Size()
{
std::lock_guard<std::mutex> locker(_mutex);
return _queue.size();
}
int Count()
{
return _queue.size();
}
private:
std::list<T> _queue; //缓冲区
std::mutex _mutex; //互斥量和条件变量结合起来使用
std::condition_variable_any _notEmpty;//不为空的条件变量
std::condition_variable_any _notFull; //没有满的条件变量
int _maxSize; //同步队列最大的size
};
异步操作
异步操作涉及到几个模板对象:std::future,std::aysnc,std::promise,std::packaged_task.
std::future
std::future期待一个返回,从一个异步调用的角度来说,future更像是执行函数的返回值,C++标准库使用std::future为一次性事件建模,如果一个事件需要等待特定的一次性事件,那么这线程可以获取一个future对象来代表这个事件。
异步调用往往不知道何时返回,但是如果异步调用的过程需要同步,或者说后一个异步调用需要使用前一个异步调用的结果。这个时候就要用到future。
线程可以周期性的在这个future上等待一小段时间,检查future是否已经ready,如果没有,该线程可以先去做另一个任务,一旦future就绪,该future就无法复位(无法再次使用这个future等待这个事件),所以future代表的是一次性事件。
std::future是一个模板,模板参数就是期待返回的类型,虽然future被用于线程间通信,但其本身却并不提供同步访问.
跟thread类似,async允许你通过将额外的参数添加到调用中,来将附加参数传递给函数。如果传入的函数指针是某个类的成员函数,则还需要将类对象指针传入(直接传入,传入指针,或者是std::ref封装)。默认情况下,std::async是否启动一个新线程,或者在等待future时,任务是否同步运行都取决于你给的参数。这个参数为std::launch类型
- std::launch::defered表明该函数会被延迟调用,直到在future上调用get()或者wait()为止
- std::launch::async,表明函数会在自己创建的线程上运行
- std::launch::any = std::launch::defered | std::launch::async
- std::launch::sync = std::launch::defered
#include <iostream>
#include <future>
#include <thread>
using namespace std;
int find_result_to_add()
{
// std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟的影响
return 1 + 1;
}
int find_result_to_add2(int a, int b)
{
// std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟的影响
return a + b;
}
void do_other_things()
{
std::cout << "Hello World" << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(5));
}
int main()
{
// std::future<int> result = std::async(find_result_to_add);
std::future<decltype (find_result_to_add())> result =
std::async(find_result_to_add);
do_other_things();
std::cout << "result: " << result.get() << std::endl; // 延迟是否有影响?
// std::future<decltype (find_result_to_add2(int, int))> result2 =
std::async(find_result_to_add2, 10, 20); //错误
std::future<decltype (find_result_to_add2(0, 0))> result2 =
std::async(find_result_to_add2, 10, 20);
std::cout << "result2: " << result2.get() << std::endl; // 延迟是否有影响?
return 0;
}
使用std::future和std::async结合可以实现异步的功能,当异步功能实现完成之后,需要同步的时候,调用future的get函数可以得到执行后的结果,如果异步功能还没执行完成,那么get函数就会阻塞等待异步函数执行完成。
std::packaged_task
如果说std::async和std::feature还是分开看的关系的话,那么std::packaged_task就是将任务和feature绑定在一起的模板,是一种封装对任务的封装。
可以通过std::packaged_task对象获取任务相关联的feature,调用get_future()方法可以获得
std::packaged_task对象绑定的函数的返回值类型的future。std::packaged_task的模板参数是函数签名。
#include <iostream>
#include <future>
using namespace std;
int add(int a, int b)
{
return a + b;
}
void do_other_things()
{
std::cout << "Hello World" << std::endl;
}
int main()
{
std::packaged_task<int(int, int)> task(add);
do_other_things();
std::future<int> result = task.get_future();
task(1, 1); //必须要让任务执行,否则在get()获取future的值时会一直阻塞
std::cout << result.get() << std::endl;
return 0;
}
std::promise
从字面意思上理解promise代表一个承诺。promise比std::packaged_task抽象层次低。
std::promise提供了一种设置值的方式,它可以在这之后通过相关联的std::future对象进行读取。换种说法,之前已经说过std::future可以读取一个异步函数的返回值了,那么这个std::promise就提供一种方式手动让future就绪。
#include <future>
#include <string>
#include <thread>
#include <iostream>
using namespace std;
void print(std::promise<std::string>& p)
{
p.set_value("There is the result whitch you want.");
}
void do_some_other_things()
{
std::cout << "Hello World" << std::endl;
}
int main()
{
std::promise<std::string> promise;
std::future<std::string> result = promise.get_future();
std::thread t(print, std::ref(promise));
do_some_other_things();
std::cout << result.get() << std::endl;
t.join();
return 0;
}
由此可以看出在promise创建好的时候future也已经创建好了线程在创建promise的同时会获得一个future,然后将promise传递给设置他的线程,当前线程则持有future,以便随时检查是否可以取值。
总结
future的表现为期望,当前线程持有future时,期望从future获取到想要的结果和返回,可以把future当做异步函数的返回值。而promise是一个承诺,当线程创建了promise对象后,这个promise对象向线程承诺他必定会被人设置一个值,和promise相关联的future就是获取其返回的手段。
function和bind的用法
在设计回调函数的时候,无可避免地会接触到可回调对象。在C++11中,提供了std::function和
std::bind两个方法来对可回调对象进行统一和封装。
C++语言中有几种可调用对象:函数、函数指针、lambda表达式、bind创建的对象以及重载了函数调用运算符的类。
和其他对象一样,可调用对象也有类型。例如,每个lambda有它自己唯一的(未命名)类类型;函数及函数指针的类型则由其返回值类型和实参类型决定。
function的用法
保存普通函数
void printA(int a)
{
cout << a << endl;
}
std::function<void(int a)> func;
func = printA;
func(2); //2
保存lambda表达式
std::function<void()> func_1 = [](){cout << "hello world" << endl;};
func_1(); //hello world
保存成员函数
class Foo{
Foo(int num) : num_(num){}
void print_add(int i) const {cout << num_ + i << endl;}
int num_;
};
//保存成员函数
std::function<void(const Foo&,int)> f_add_display = &Foo::print_add;
Foo foo(2);
f_add_display(foo,1);
bind的用法
可将bind函数看作是一个通用的函数适配器,它接受一个可调用对象,生成一个新的可调用对象来“适应”原对象的参数列表。
调用bind的一般形式:auto newCallable = bind(callable,arg_list);
其中,newCallable本身是一个可调用对象,arg_list是一个逗号分隔的参数列表,对应给定的callable的参数。即,当我们调用newCallable时,newCallable会调用callable,并传给它arg_list中的参数。
arg_list中的参数可能包含形如n的名字,其中n是一个整数,这些参数是“占位符”,表示newCallable的参数,它们占据了传递给newCallable的参数的“位置”。数值n表示生成的可调用对象中参数的位置:1为newCallable的第一个参数,_2为第二个参数,以此类推。
#include <iostream>
#include <functional>
using namespace std;
void fun_1(int x,int y,int z)
{
cout<<"print: x=" <<x<<",y="<< y << ",z=" <<z<<endl;
}
void fun_2(int &a,int &b)
{
a++;
b++;
cout<<"print: a=" <<a<<",b="<<b<<endl;
}
int main()
{
//f1的类型为 function<void(int, int, int)>
auto f1 = std::bind(fun_1,1,2,3); //表示绑定函数 fun 的第一,二,三个参数值为: 1 2 3
f1(); //print: x=1,y=2,z=3
auto f2 = std::bind(fun_1, placeholders::_1,placeholders::_2,3);
//表示绑定函数 fun 的第三个参数为 3,而fun 的第一,二个参数分别由调用 f2 的第一,二个参数指定
f2(1,2);//print: x=1,y=2,z=3
}
线程池
有了前面知识的铺垫, 在来看线程池的代码,就容易多了。
头文件
//zero_threadpool.h
#ifndef ZERO_THREADPOOL_H
#define ZERO_THREADPOOL_H
#include <future>
#include <functional>
#include <iostream>
#include <queue>
#include <mutex>
#include <memory>
#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endif
using namespace std;
void getNow(timeval *tv);
int64_t getNowMs();
#define TNOW getNow()
#define TNOWMS getNowMs()
/
/**
* @file zero_thread_pool.h
* @brief 线程池类,采用c++11来实现了,
* 使用说明:
* ZERO_ThreadPool tpool;
* tpool.init(5); //初始化线程池线程数
* //启动线程方式
* tpool.start();
* //将任务丢到线程池中
* tpool.exec(testFunction, 10); //参数和start相同
* //等待线程池结束
* tpool.waitForAllDone(1000); //参数<0时, 表示无限等待(注意有人调用stop也会推出)
* //此时: 外部需要结束线程池是调用
* tpool.stop();
* 注意:
* ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:
* int testInt(int i)
* {
* return i;
* }
* auto f = tpool.exec(testInt, 5);
* cout << f.get() << endl; //当testInt在线程池中执行后, f.get()会返回数值5
*
* class Test
* {
* public:
* int test(int i);
* };
* Test t;
* auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);
* //返回的future对象, 可以检查是否执行
* cout << f.get() << endl;
*/
class ZERO_ThreadPool
{
protected:
//任务结构体
struct TaskFunc
{
TaskFunc(uint64_t expireTime) : _expireTime(expireTime)//超时事件
{ }
std::function<void()> _func;//要执行的任务类型
int64_t _expireTime = 0;//超时的绝对时间
};
typedef shared_ptr<TaskFunc> TaskFuncPtr; //任务智能指针的结构体
public:
/**
* @brief 构造函数
*
*/
ZERO_ThreadPool();
/**
* @brief 析构, 会停止所有线程
*/
virtual ~ZERO_ThreadPool();
/**
* @brief 初始化.
*
* @param num 工作线程个数
*/
bool init(size_t num);
/**
* @brief 获取线程个数.
*
* @return size_t 线程个数
*/
size_t getThreadNum()
{
std::unique_lock<std::mutex> lock(_mutex);
return _threads.size();
}
/**
* @brief 获取当前线程池的任务数
*
* @return size_t 线程池的任务数
*/
size_t getJobNum()
{
std::unique_lock<std::mutex> lock(_mutex);
return _tasks.size();
}
/**
* @brief 停止所有线程, 会等待所有线程结束
*/
void stop();
/**
* @brief 启动所有线程
*/
bool start(); // 创建线程
/**
* @brief 用线程池启用任务(F是function, Args是参数)
*
* @param ParentFunctor
* @param tf
* @return 返回任务的future对象, 可以通过这个对象来获取返回值
*/
template <class F, class... Args>
auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
return exec(0,f,args...);
}
/**
* @brief 用线程池启用任务(F是function, Args是参数)
*
* @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃
* @param bind function
* @return 返回任务的future对象, 可以通过这个对象来获取返回值
*/
/*
template <class F, class... Args>
它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值
返回值后置
*/
template <class F, class... Args>
auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 获取现在时间
//定义返回值类型
using RetType = decltype(f(args...)); // 推导返回值
//封装任务 ,task是一个智能指针,原对象是packaged_task
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime); // 封装任务指针,设置过期时间
fPtr->_func = [task]() { // 具体执行的函数
(*task)();//等到要执行的任务
};
std::unique_lock<std::mutex> lock(_mutex);
_tasks.push(fPtr); // 插入任务
_condition.notify_one(); // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notify
return task->get_future();
}
/**
* @brief 等待当前任务队列中, 所有工作全部结束(队列无任务).
*
* @param millsecond 等待的时间(ms), -1:永远等待
* @return true, 所有工作都处理完毕
* false,超时退出
*/
bool waitForAllDone(int millsecond = -1);
protected:
/**
* @brief 获取任务
*
* @return TaskFuncPtr
*/
bool get(TaskFuncPtr&task);
/**
* @brief 线程池是否退出
*/
bool isTerminate() { return _bTerminate; }
/**
* @brief 每个线程要执行的任务
* 1.循环判断是否结束
* 2.从队列中获取一个任务
* 3.执行任务
*/
void run();
protected:
/**
* 任务队列(先进先出,使用queue)
*/
queue<TaskFuncPtr> _tasks;
/**
* 工作线程
*/
std::vector<std::thread*> _threads;
std::mutex _mutex;
std::condition_variable _condition;//
size_t _threadNum;//线程数量
bool _bTerminate;//是否终止的标志
std::atomic<int> _atomic{ 0 };//原子变量(用于执行任务时,确定)
};
#endif // ZERO_THREADPOOL_H
源文件
#include "pch.h"
#include "zero_threadpool.h"
ZERO_ThreadPool::ZERO_ThreadPool()
: _threadNum(1), _bTerminate(false)
{
}
ZERO_ThreadPool::~ZERO_ThreadPool()
{
stop();
}
bool ZERO_ThreadPool::init(size_t num)
{
std::unique_lock<std::mutex> lock(_mutex);
if (!_threads.empty())//如果已经初始化了线程
{
return false;
}
_threadNum = num;//设置线程数
return true;
}
void ZERO_ThreadPool::stop()
{
{
std::unique_lock<std::mutex> lock(_mutex);
_bTerminate = true;
_condition.notify_all();
}
for (size_t i = 0; i < _threads.size(); i++)
{
if(_threads[i]->joinable())
{
_threads[i]->join();
}
delete _threads[i];
_threads[i] = NULL;
}
std::unique_lock<std::mutex> lock(_mutex);
_threads.clear();
}
bool ZERO_ThreadPool::start()
{
std::unique_lock<std::mutex> lock(_mutex);
if (!_threads.empty())
{
return false;
}
//生成线程,执行线程逻辑,并放入到线程集合中
for (size_t i = 0; i < _threadNum; i++)
{
_threads.push_back(new thread(&ZERO_ThreadPool::run, this));
}
return true;
}
//从任务队列中取一个任务
//
bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{
std::unique_lock<std::mutex> lock(_mutex);
if (_tasks.empty())//等待添加任务,如果没有任务,就一直等待任务的到来
{
_condition.wait(lock, [this] { return _bTerminate || !_tasks.empty(); });
}
if (_bTerminate)
return false;
if (!_tasks.empty())
{
task = std::move(_tasks.front()); // 使用了移动语义
_tasks.pop();
return true;
}
return false;
}
void ZERO_ThreadPool::run() // 执行任务的线程
{
//调用处理部分
while (!isTerminate()) // 判断是不是要停止
{
TaskFuncPtr task;
bool ok = get(task); // 读取任务
if (ok)
{
++_atomic;
try
{
if (task->_expireTime != 0 && task->_expireTime < TNOWMS )
{
//超时任务,是否需要处理?
}
else
{
task->_func(); // 执行任务
}
}
catch (...)
{
}
--_atomic;
//任务都执行完毕了
std::unique_lock<std::mutex> lock(_mutex);
if (_atomic == 0 && _tasks.empty())
{
_condition.notify_all(); // 这里只是为了通知waitForAllDone,告知所有线程都执行完毕
}
}
}
}
bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{
std::unique_lock<std::mutex> lock(_mutex);
if (_tasks.empty())
return true;
if (millsecond < 0)
{
_condition.wait(lock, [this] { return _tasks.empty(); });
return true;
}
else
{
return _condition.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return _tasks.empty(); });
}
}
int gettimeofday(struct timeval &tv)
{
#if WIN32
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm. tm_isdst = -1;
clock = mktime(&tm);
tv.tv_sec = clock;
tv.tv_usec = wtm.wMilliseconds * 1000;
return 0;
#else
return ::gettimeofday(&tv, 0);
#endif
}
void getNow(timeval *tv)
{
#if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUX
int idx = _buf_idx;
*tv = _t[idx];
if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc)
{
addTimeOffset(*tv, idx);
}
else
{
TC_Common::gettimeofday(*tv);
}
#else
gettimeofday(*tv);
#endif
}
int64_t getNowMs()
{
struct timeval tv;
getNow(&tv);
return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
}
原文地址:https://blog.csdn.net/abcd552191868/article/details/135917806
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!