自学内容网 自学内容网

基于libuv实现的C++定时器管理器——TimerManager

在多线程编程中,定时器是一个非常重要的功能,它能够让我们在特定的时间点执行特定的任务。本文将介绍一个基于libuv库实现的C++定时器管理器——TimerManager,它通过创建多个工作线程,每个线程运行一个uv loop来高效地管理定时器任务。

1.TimerManager概述

TimerManager是一个用于管理定时器的类,它在初始化时会根据设置的线程数thread_num创建相应数量的工作线程。每个工作线程中都运行着一个uv loop,这些uv loop负责处理定时器任务。TimerManager提供了线程安全的接口,允许用户在不同线程中提交定时器任务、停止或等待定时器管理器。

2.核心组件

2.1 TimerManager类

TimerManager类是整个定时器管理器的核心,它包含以下几个关键部分:
线程数设置:通过thread_num成员变量设置工作线程的数量。
定时器回调类型:定义了timer_callback_t类型,这是一个函数对象,用于处理定时器到期时的回调操作。
定时器请求结构:TimerRequest结构体封装了定时器的相关信息,如超时时间、重复间隔和回调函数。
工作线程结构:WorkThread结构体代表每个工作线程,包含线程对象、请求队列、异步句柄等。
状态管理:通过status_原子变量管理定时器管理器的状态,如初始状态、启动状态、停止状态等。

2.2 工作线程

每个工作线程都运行在一个独立的uv loop中,它们通过WorkThreadMain函数启动。在该函数中,会初始化uv loop,并创建一个异步句柄async_handle用于接收来自其他线程的定时器请求。工作线程会不断运行uv loop,处理定时器任务,直到收到停止或等待的指令。

2.3 定时器任务处理

当用户通过StartTimer方法提交一个定时器任务时,TimerManager会根据round-robin算法选择一个工作线程,并将定时器请求添加到该线程的请求队列中。工作线程在收到请求后,会根据请求的细节创建一个uv_timer_t对象,并启动定时器。当定时器到期时,会调用相应的回调函数。

3. 关键功能

3.1 启动定时器管理器

通过调用Start方法,TimerManager会根据设置的线程数创建相应数量的工作线程,并启动每个线程中的uv loop。在启动过程中,会使用std::promise和std::future机制确保所有工作线程都已成功启动并准备好接收定时器请求。

3.2 提交定时器任务

StartTimer方法允许用户提交定时器任务。用户需要提供超时时间、重复间隔和回调函数。TimerManager会根据round-robin算法选择一个工作线程,并将定时器请求添加到该线程的请求队列中。工作线程在收到请求后,会创建uv_timer_t对象并启动定时器。

3.3 停止和等待定时器管理器

Stop方法用于停止定时器管理器,它会通知所有工作线程停止运行,并等待所有已启动的定时器任务完成。Join方法则会等待所有定时器任务主动停止后才返回。

4.基于libuv实现的C++定时器管理器

...

struct TimerManager {
    size_t thread_num = 1;

public:
    // TimerCallbackReturnEnum 这个名字不会被使用, 所以随便起起可以
    enum TimerCallbackReturnEnum : unsigned int {
        /// 该标志设置则表明主动停止 timer.
        kStopTimer = 0x01,

        /// 若觉得 callback 执行时间可能超过了 1ms, 则应该设置该标志.
        kMoreThan1MS = 0x02
    };

    /**
     * timer callback 类型.
     *
     * timer callback 的调用形式如下:
     *
     *  param: status; 若为 0, 则表明当前定时器到期了; 若不为 0, 则表明由于出现了某些错误才会被调用, 此后
     *  当前回调不会再调用.
     *
     *  return; callback 的返回值会被当作位掩码来对待, 参见 TimerCallbackReturnEnum.
     *  unsigned int (unsigned int status);
     */
    using timer_callback_t = std::function<unsigned int(unsigned int)>;

public:

    void Start();

    ~TimerManager() noexcept;

public:
    /* 只有这里的接口才是线程安全的, 即可以在不同线程, 在同一个 TimerManager 实例上调用 Stop(), Join(), 或者
     * StartTimer(). 但是不能在线程 A 上针对同一个 TimerManager 实例(下称 T )调用 Start(), 在线程 B 上调用
     * T.StartTimer().
     */

    /**
     * 停止 TimerManager.
     *
     * 此时尚未被处理的 timer 不会被处理, 即直接以错误状态来调用 timer callback. 此时会一直等待已经处理的 timer
     * 主动停止才会返回.
     */
    void Stop() {
        DoStopOrJoin(TimerManagerStatus::kStop);
        return ;
    }

    /**
     * 停止 TimerManager.
     *
     * 此时会一直等待所有的 timer 主动停止才会返回.
     */
    void Join() {
        DoStopOrJoin(TimerManagerStatus::kJoin);
        return ;
    }

    /**
     * 启动一个定时器.
     *
     * 若 timeout 为 0, 则会立即调用 cb; 否则会在 timeout ms 之后调用 cb; 在第一次调用 cb 之后, 若 repeat
     * 为 0, 则停止当前定时器; 否则之后会每 repeat ms 之后调用一次 cb.
     */
    void StartTimer(uint64_t timeout, uint64_t repeat, const timer_callback_t &cb);

/* 本来这些都是 private 就行了.
 *
 * 但是我想重载个 operator<<(ostream &out, TimerManagerStatus); 本来是把这个重载当作是 static member, 然
 * 后编译报错. 貌似只能作为 non-member, 这样子的话, TimerManagerStatus 也就必须得是 public 了.
 */
public:

    enum class TimerManagerStatus : unsigned int {
        kInitial = 0,
        kStarted,
        kStop,
        kJoin
    };

    struct TimerRequest {
        uint64_t timeout;
        uint64_t repeat;
        timer_callback_t cb;

    public:
        TimerRequest(uint64_t timeout_arg, uint64_t repeat_arg, const timer_callback_t &cb_arg):
            timeout(timeout_arg),
            repeat(repeat_arg),
            cb(cb_arg) {
        }

        unsigned int Call(unsigned int status) noexcept {
            return cb(status);
        }
    };


    struct WorkThread {
        bool started = false;
        std::thread thread;

        // 不变量 31: 加锁顺序, 先 vec_mux 再 handle_mux.
        std::mutex vec_mux;
        /* request_vec 的内存是由 work thread 来分配.
         *
         * 不变量 4: 对于其他线程而言, 其检测到若 request_vec 为 nullptr, 则表明对应的 work thread 不再工作,
         * 此时不能往 request_vec 中加入请求. 反之, 则表明 work thread 正常工作, 此时可以压入元素.
         */
        std::unique_ptr<std::vector<std::unique_ptr<TimerRequest>>> request_vec;

        std::shared_mutex handle_mux;
        /* 不变量 3: 若 async_handle != nullptr, 则表明 async_handle 指向着的 uv_async_t 已经被初始化, 此时
         * 对其调用 uv_async_send() 不会触发 SIGSEGV.
         *
         * 其实这里可以使用读写锁, 因为 uv_async_send() 是线程安全的, 但是 uv_close(), uv_async_init() 这些
         * 并不是. 也即在执行 uv_async_send() 之前加读锁, 其他操作加写锁.
         */
        uv_async_t *async_handle = nullptr;

    public:
        void AsyncSend() noexcept {
            handle_mux.lock_shared();
            if (async_handle) {
                uv_async_send(async_handle); // 当 send() 失败了怎么办???
            }
            handle_mux.unlock_shared();
            return ;
        }

        /**
         * 将 timer_req 表示的请求追加到当前 work thread 中.
         *
         * 若抛出异常, 则表明追加失败, 此时 timer_req 引用的对象没有任何变化. 若未抛出异常, 则根据 timer_req
         * 是否为空来判断请求是否成功追加, 即当不为空时, 表明请求成功追加到当前 work thread 中.
         */
        void AddRequest(std::unique_ptr<TimerRequest> &timer_req);
    };

private:
    std::atomic<TimerManagerStatus> status_{TimerManagerStatus::kInitial}; // lock-free
    std::atomic_uint seq_num{0};
    std::unique_ptr<std::vector<WorkThread>> work_threads_;

private:
    TimerManagerStatus GetStatus() noexcept {
        return status_.load(std::memory_order_relaxed);
    }

    void SetStatus(TimerManagerStatus status) noexcept {
        status_.store(status, std::memory_order_relaxed);
        return ;
    }

    void JoinAllThread() noexcept {
        for (WorkThread &work_thread : *work_threads_) {
            if (!work_thread.started)
                continue ;

            work_thread.thread.join(); // join() 理论上不会抛出异常的.
        }
    }

    void DoStopOrJoin(TimerManagerStatus op);

private:
    static void WorkThreadMain(TimerManager *timer_manager, size_t idx, std::promise<void> *p) noexcept;

    static void OnAsyncHandle(uv_async_t* handle) noexcept;
};

inline std::ostream& operator<<(std::ostream &out, TimerManager::TimerManagerStatus status) {
    out << static_cast<unsigned int>(status);
    return out;
}


4. 1示例代码

以下是一个使用TimerManager的示例代码:


...

struct TimerCB {
    int timer_id = 0;
    int times = 0;
    int total_times = 5;

public:
    TimerCB(int id, int total_times_arg) noexcept :
        timer_id(id),
        total_times(total_times_arg) {
        LOG(INFO) << "id: " << timer_id << "; times: " << times << "; total_times: " << total_times;
    }

    unsigned int operator()(unsigned int status) noexcept {
        LOG(INFO) << "id: " << timer_id << "; times: " << times << "; total_times: " << total_times
                  << "; status: " << status;

        if (++times >= total_times)
            return TimerManager::kStopTimer;
        return 0;
    }
};

int main(int argc, char **argv) {
    google::SetUsageMessage("TimerManager Test");
    google::SetVersionString("1.0.0");
    google::ParseCommandLineFlags(&argc, &argv, false);

    google::InitGoogleLogging(argv[0]);
    google::InstallFailureSignalHandler();

    TimerManager timer_manager;
    timer_manager.thread_num = FLAGS_work_thread_num;
    timer_manager.Start();

    timer_manager.StartTimer(0, 0, TimerCB(1, 3));
    timer_manager.StartTimer(3000, 0, TimerCB(2, 3));
    timer_manager.StartTimer(0, 2000, TimerCB(3, 3));
    timer_manager.StartTimer(1000, 2000, TimerCB(4, 3));

    timer_manager.Join();
    return 0;
}


If you need the complete source code, please add the WeChat number (c17865354792)

在该示例中,我们首先初始化TimerManager,设置工作线程数,并启动定时器管理器。然后,我们提交了几个定时器任务,每个任务都有不同的超时时间和重复间隔。最后,我们调用Join方法等待所有定时器任务完成。

总结

TimerManager是一个基于libuv实现的高效C++定时器管理器。它通过创建多个工作线程,每个线程运行一个uv loop,实现了对定时器任务的高效管理。TimerManager提供了线程安全的接口,方便用户在多线程环境中使用。通过合理地使用TimerManager,我们可以在C++程序中轻松地实现定时任务的功能。

Welcome to follow WeChat official account【程序猿编码


原文地址:https://blog.csdn.net/chen1415886044/article/details/145245488

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!