基于libuv实现的C++定时器管理器——TimerManager
在多线程编程中,定时器是一个非常重要的功能,它能够让我们在特定的时间点执行特定的任务。本文将介绍一个基于libuv库实现的C++定时器管理器——TimerManager,它通过创建多个工作线程,每个线程运行一个uv loop来高效地管理定时器任务。
1.TimerManager概述
TimerManager是一个用于管理定时器的类,它在初始化时会根据设置的线程数thread_num创建相应数量的工作线程。每个工作线程中都运行着一个uv loop,这些uv loop负责处理定时器任务。TimerManager提供了线程安全的接口,允许用户在不同线程中提交定时器任务、停止或等待定时器管理器。
2.核心组件
2.1 TimerManager类
TimerManager类是整个定时器管理器的核心,它包含以下几个关键部分:
线程数设置:通过thread_num成员变量设置工作线程的数量。
定时器回调类型:定义了timer_callback_t类型,这是一个函数对象,用于处理定时器到期时的回调操作。
定时器请求结构:TimerRequest结构体封装了定时器的相关信息,如超时时间、重复间隔和回调函数。
工作线程结构:WorkThread结构体代表每个工作线程,包含线程对象、请求队列、异步句柄等。
状态管理:通过status_原子变量管理定时器管理器的状态,如初始状态、启动状态、停止状态等。
2.2 工作线程
每个工作线程都运行在一个独立的uv loop中,它们通过WorkThreadMain函数启动。在该函数中,会初始化uv loop,并创建一个异步句柄async_handle用于接收来自其他线程的定时器请求。工作线程会不断运行uv loop,处理定时器任务,直到收到停止或等待的指令。
2.3 定时器任务处理
当用户通过StartTimer方法提交一个定时器任务时,TimerManager会根据round-robin算法选择一个工作线程,并将定时器请求添加到该线程的请求队列中。工作线程在收到请求后,会根据请求的细节创建一个uv_timer_t对象,并启动定时器。当定时器到期时,会调用相应的回调函数。
3. 关键功能
3.1 启动定时器管理器
通过调用Start方法,TimerManager会根据设置的线程数创建相应数量的工作线程,并启动每个线程中的uv loop。在启动过程中,会使用std::promise和std::future机制确保所有工作线程都已成功启动并准备好接收定时器请求。
3.2 提交定时器任务
StartTimer方法允许用户提交定时器任务。用户需要提供超时时间、重复间隔和回调函数。TimerManager会根据round-robin算法选择一个工作线程,并将定时器请求添加到该线程的请求队列中。工作线程在收到请求后,会创建uv_timer_t对象并启动定时器。
3.3 停止和等待定时器管理器
Stop方法用于停止定时器管理器,它会通知所有工作线程停止运行,并等待所有已启动的定时器任务完成。Join方法则会等待所有定时器任务主动停止后才返回。
4.基于libuv实现的C++定时器管理器
...
struct TimerManager {
size_t thread_num = 1;
public:
// TimerCallbackReturnEnum 这个名字不会被使用, 所以随便起起可以
enum TimerCallbackReturnEnum : unsigned int {
/// 该标志设置则表明主动停止 timer.
kStopTimer = 0x01,
/// 若觉得 callback 执行时间可能超过了 1ms, 则应该设置该标志.
kMoreThan1MS = 0x02
};
/**
* timer callback 类型.
*
* timer callback 的调用形式如下:
*
* param: status; 若为 0, 则表明当前定时器到期了; 若不为 0, 则表明由于出现了某些错误才会被调用, 此后
* 当前回调不会再调用.
*
* return; callback 的返回值会被当作位掩码来对待, 参见 TimerCallbackReturnEnum.
* unsigned int (unsigned int status);
*/
using timer_callback_t = std::function<unsigned int(unsigned int)>;
public:
void Start();
~TimerManager() noexcept;
public:
/* 只有这里的接口才是线程安全的, 即可以在不同线程, 在同一个 TimerManager 实例上调用 Stop(), Join(), 或者
* StartTimer(). 但是不能在线程 A 上针对同一个 TimerManager 实例(下称 T )调用 Start(), 在线程 B 上调用
* T.StartTimer().
*/
/**
* 停止 TimerManager.
*
* 此时尚未被处理的 timer 不会被处理, 即直接以错误状态来调用 timer callback. 此时会一直等待已经处理的 timer
* 主动停止才会返回.
*/
void Stop() {
DoStopOrJoin(TimerManagerStatus::kStop);
return ;
}
/**
* 停止 TimerManager.
*
* 此时会一直等待所有的 timer 主动停止才会返回.
*/
void Join() {
DoStopOrJoin(TimerManagerStatus::kJoin);
return ;
}
/**
* 启动一个定时器.
*
* 若 timeout 为 0, 则会立即调用 cb; 否则会在 timeout ms 之后调用 cb; 在第一次调用 cb 之后, 若 repeat
* 为 0, 则停止当前定时器; 否则之后会每 repeat ms 之后调用一次 cb.
*/
void StartTimer(uint64_t timeout, uint64_t repeat, const timer_callback_t &cb);
/* 本来这些都是 private 就行了.
*
* 但是我想重载个 operator<<(ostream &out, TimerManagerStatus); 本来是把这个重载当作是 static member, 然
* 后编译报错. 貌似只能作为 non-member, 这样子的话, TimerManagerStatus 也就必须得是 public 了.
*/
public:
enum class TimerManagerStatus : unsigned int {
kInitial = 0,
kStarted,
kStop,
kJoin
};
struct TimerRequest {
uint64_t timeout;
uint64_t repeat;
timer_callback_t cb;
public:
TimerRequest(uint64_t timeout_arg, uint64_t repeat_arg, const timer_callback_t &cb_arg):
timeout(timeout_arg),
repeat(repeat_arg),
cb(cb_arg) {
}
unsigned int Call(unsigned int status) noexcept {
return cb(status);
}
};
struct WorkThread {
bool started = false;
std::thread thread;
// 不变量 31: 加锁顺序, 先 vec_mux 再 handle_mux.
std::mutex vec_mux;
/* request_vec 的内存是由 work thread 来分配.
*
* 不变量 4: 对于其他线程而言, 其检测到若 request_vec 为 nullptr, 则表明对应的 work thread 不再工作,
* 此时不能往 request_vec 中加入请求. 反之, 则表明 work thread 正常工作, 此时可以压入元素.
*/
std::unique_ptr<std::vector<std::unique_ptr<TimerRequest>>> request_vec;
std::shared_mutex handle_mux;
/* 不变量 3: 若 async_handle != nullptr, 则表明 async_handle 指向着的 uv_async_t 已经被初始化, 此时
* 对其调用 uv_async_send() 不会触发 SIGSEGV.
*
* 其实这里可以使用读写锁, 因为 uv_async_send() 是线程安全的, 但是 uv_close(), uv_async_init() 这些
* 并不是. 也即在执行 uv_async_send() 之前加读锁, 其他操作加写锁.
*/
uv_async_t *async_handle = nullptr;
public:
void AsyncSend() noexcept {
handle_mux.lock_shared();
if (async_handle) {
uv_async_send(async_handle); // 当 send() 失败了怎么办???
}
handle_mux.unlock_shared();
return ;
}
/**
* 将 timer_req 表示的请求追加到当前 work thread 中.
*
* 若抛出异常, 则表明追加失败, 此时 timer_req 引用的对象没有任何变化. 若未抛出异常, 则根据 timer_req
* 是否为空来判断请求是否成功追加, 即当不为空时, 表明请求成功追加到当前 work thread 中.
*/
void AddRequest(std::unique_ptr<TimerRequest> &timer_req);
};
private:
std::atomic<TimerManagerStatus> status_{TimerManagerStatus::kInitial}; // lock-free
std::atomic_uint seq_num{0};
std::unique_ptr<std::vector<WorkThread>> work_threads_;
private:
TimerManagerStatus GetStatus() noexcept {
return status_.load(std::memory_order_relaxed);
}
void SetStatus(TimerManagerStatus status) noexcept {
status_.store(status, std::memory_order_relaxed);
return ;
}
void JoinAllThread() noexcept {
for (WorkThread &work_thread : *work_threads_) {
if (!work_thread.started)
continue ;
work_thread.thread.join(); // join() 理论上不会抛出异常的.
}
}
void DoStopOrJoin(TimerManagerStatus op);
private:
static void WorkThreadMain(TimerManager *timer_manager, size_t idx, std::promise<void> *p) noexcept;
static void OnAsyncHandle(uv_async_t* handle) noexcept;
};
inline std::ostream& operator<<(std::ostream &out, TimerManager::TimerManagerStatus status) {
out << static_cast<unsigned int>(status);
return out;
}
4. 1示例代码
以下是一个使用TimerManager的示例代码:
...
struct TimerCB {
int timer_id = 0;
int times = 0;
int total_times = 5;
public:
TimerCB(int id, int total_times_arg) noexcept :
timer_id(id),
total_times(total_times_arg) {
LOG(INFO) << "id: " << timer_id << "; times: " << times << "; total_times: " << total_times;
}
unsigned int operator()(unsigned int status) noexcept {
LOG(INFO) << "id: " << timer_id << "; times: " << times << "; total_times: " << total_times
<< "; status: " << status;
if (++times >= total_times)
return TimerManager::kStopTimer;
return 0;
}
};
int main(int argc, char **argv) {
google::SetUsageMessage("TimerManager Test");
google::SetVersionString("1.0.0");
google::ParseCommandLineFlags(&argc, &argv, false);
google::InitGoogleLogging(argv[0]);
google::InstallFailureSignalHandler();
TimerManager timer_manager;
timer_manager.thread_num = FLAGS_work_thread_num;
timer_manager.Start();
timer_manager.StartTimer(0, 0, TimerCB(1, 3));
timer_manager.StartTimer(3000, 0, TimerCB(2, 3));
timer_manager.StartTimer(0, 2000, TimerCB(3, 3));
timer_manager.StartTimer(1000, 2000, TimerCB(4, 3));
timer_manager.Join();
return 0;
}
If you need the complete source code, please add the WeChat number (c17865354792)
在该示例中,我们首先初始化TimerManager,设置工作线程数,并启动定时器管理器。然后,我们提交了几个定时器任务,每个任务都有不同的超时时间和重复间隔。最后,我们调用Join方法等待所有定时器任务完成。
总结
TimerManager是一个基于libuv实现的高效C++定时器管理器。它通过创建多个工作线程,每个线程运行一个uv loop,实现了对定时器任务的高效管理。TimerManager提供了线程安全的接口,方便用户在多线程环境中使用。通过合理地使用TimerManager,我们可以在C++程序中轻松地实现定时任务的功能。
Welcome to follow WeChat official account【程序猿编码】
原文地址:https://blog.csdn.net/chen1415886044/article/details/145245488
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!