自学内容网 自学内容网

【c++并发编程】线程池实现

参考https://shanhai.huawei.com/#/page-forum/post-details?postId=43796

完整代码

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <atomic>

// 任务优先级结构体
struct PriorityTask {
    int priority;
    std::function<void()> func;

    // 优先级比较,优先级数值越小,优先级越高
    bool operator<(const PriorityTask& other) const {
        return priority > other.priority;
    }
};

// 线程池类
class ThreadPool {
public:
    ThreadPool(size_t threads) : stop(false) {
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    PriorityTask task;

                    {
                        std::unique_lock<std::mutex> lock(this->queueMutex);
                        this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.top());
                        this->tasks.pop();
                    }

                    task.func();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& worker : workers) {
            worker.join();
        }
    }

    template<class F, class... Args>
    auto enqueue(int priority, F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queueMutex);

            // Don't allow enqueueing after stopping the pool
            if(stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            tasks.emplace(PriorityTask{priority, [task]() { (*task)(); }});
        }
        condition.notify_one();
        return res;
    }

private:
    // Need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // The task queue
    std::priority_queue< PriorityTask > tasks;

    // Synchronization
    std::mutex queueMutex;
    std::condition_variable condition;
    std::atomic<bool> stop;
};

// 使用示例
int main() {
    ThreadPool pool(4);

    auto result1 = pool.enqueue(1, []() -> int {
        std::cout << "Executing task 1" << std::endl;
        return 1;
    });

    auto result2 = pool.enqueue(0, []() -> int {
        std::cout << "Executing task 2" << std::endl;
        return 2;
    });

    std::cout << "Task 1 result: " << result1.get() << std::endl;
    std::cout << "Task 2 result: " << result2.get() << std::endl;

    return 0;
}

任务定义

// 任务优先级结构体
struct PriorityTask {
    int priority;
    std::function<void()> func;

    // 优先级比较,优先级数值越小,优先级越高
    bool operator<(const PriorityTask& other) const {
        return priority > other.priority;
    }
};

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
回调函数是一种编程模式,允许一个函数在其他函数完成某些操作后被调用。回调函数通常用于异步编程,即在等待某些操作完成时,可以执行其他任务,然后在操作完成时调用回调函数。
例如,在网络编程中,当发送一个请求后,可以注册一个回调函数,当接收到响应时,这个回调函数会被调用。这样,程序可以在等待响应时执行其他任务,而不是阻塞等待。
在C++中,可以使用std::function对象来存储和调用回调函数。例如:

在这里插入图片描述
在这里插入图片描述

在这个例子中,sendRequest函数接受一个URL和一个回调函数作为参数。在发送请求并接收响应后,它会调用回调函数onResponse,并传递状态码和响应。
需要注意的是,回调函数的使用需要谨慎,因为如果回调函数在被调用前就被销毁,可能会导致未定义行为。因此,通常需要确保回调函数的生命周期足够长,或者使用std::shared_ptr等机制来管理回调函数的生命周期。
总的来说,回调函数是一种强大的编程模式,可以用于实现异步编程、事件驱动编程等复杂功能,通过使用std::function对象,可以方便地存储和调用回调函数。

线程池初始化

class ThreadPool {
public:
    ThreadPool(size_t threads) : stop(false) {
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    PriorityTask task;

                    {
                        std::unique_lock<std::mutex> lock(this->queueMutex);
                        this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.top());
                        this->tasks.pop();
                    }

                    task.func();
                }
            });
        }
    }

在这里插入图片描述
task = std::move(this->tasks.top())
在这里插入图片描述

std:thread 初始化

#include <thread>
#include <iostream>

void printHello() {
    std::cout << "Hello, world!" << std::endl;
}

int main() {
    std::thread t(printHello);
    t.join();
    return 0;
}

在C++中,std::thread是一个用于表示并发执行线程的类,它是C++标准库中的一部分。std::thread对象可以通过构造函数来初始化,并且可以通过调用其成员函数来控制线程的执行。
以下是一个简单的例子,展示了如何初始化和使用std::thread对象:

#include <thread>
#include <iostream>

void printHello() {
    std::cout << "Hello, world!" << std::endl;
}

int main() {
    std::thread t(printHello);
    t.join();
    return 0;
}

在这个例子中,std::thread t(printHello);创建了一个std::thread对象t,并将其初始化为执行printHello函数。t.join();等待线程t完成执行,然后继续执行后面的代码。
需要注意的是,std::thread对象在创建后会立即启动线程,并执行指定的任务。如果在线程完成执行之前销毁std::thread对象,可能会导致未定义行为。因此,通常需要确保std::thread对象的生命周期足够长,或者使用std::shared_ptr等机制来管理线程的生命周期。
总的来说,std::thread对象可以通过构造函数来初始化,并且可以通过调用其成员函数来控制线程的执行。通过使用std::thread,可以方便地实现多线程编程,并行执行任务。
在这里插入图片描述

线程池入队

template<class F, class... Args>
auto enqueue(int priority, F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);

        // Don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace(PriorityTask{priority, [task]() { (*task)(); }});
    }
    condition.notify_one();
    return res;
}

这段代码是一个线程池类的成员函数,用于将一个新的任务添加到任务队列中。
首先,它使用模板参数F和Args来接受任务的函数和参数。然后,它使用std::bind来创建一个可调用对象,该对象在被调用时会执行函数f并传递参数args。
接着,它使用std::make_shared来创建一个std::packaged_task对象,该对象可以在另一个线程中执行任务。std::packaged_task对象包装了一个可调用对象,并提供了一个get_future方法,该方法返回一个std::future对象,该对象可以用来获取任务的结果。
然后,它使用std::unique_lock来获取queueMutex互斥锁,以确保在修改任务队列时不会有其他线程干扰。
在获取锁之后,它检查stop标志,如果线程池已经停止,那么就抛出一个运行时错误。然后,它将新的任务添加到任务队列中,并使用condition.notify_one()来唤醒一个等待condition条件变量的线程。
最后,它返回std::future对象,该对象可以用来获取任务的结果。
总的来说,这个函数的主要作用是将一个新的任务添加到任务队列中,并返回一个std::future对象,该对象可以用来获取任务的结果。

task { (*task)(); }是一个lambda表达式,它是一个没有参数的函数对象。这个lambda表达式在被调用时会执行task中的任务。
在这个例子中,task是一个std::shared_ptr,它指向一个std::packaged_task对象。std::packaged_task对象包装了一个可调用对象,该对象在被调用时会执行函数f并传递参数args。
task { (*task)(); }被调用时,它会解引用task智能指针,然后调用std::packaged_task对象的operator(),这会执行包装的任务。
这种用法的主要目的是将一个任务封装成一个可调用对象,这样就可以将任务作为参数传递给其他函数,或者存储在数据结构中,以便在未来的某个时间点执行。
需要注意的是,lambda表达式是一种匿名函对象,它可以捕获外部变量并在其自身的函数体中使用它们。在这个例子中,lambda表达式捕获了task智能指针,并在其函数体中解引用并调用它。这样就可以在新的线程中执行任务。

在lambda表达式中,&task { (*task)(); }会捕获task智能指针的引用,而不是值。这意味着,如果task智能指针在lambda表达式之外被销毁,那么在lambda表达式中使用task时就会出现悬挂引用,这是一种未定义行为。
因此,正确的做法是使用task { (*task)(); }来捕获task智能指针的值,这样就可以确保task智能指针在lambda表达式执行期间一直有效。
需要注意的是,捕获值会导致lambda表达式拷贝task智能指针
在这里插入图片描述

线程池销毁

~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& worker : workers) {
        worker.join();
    }
}

这段代码是一个线程池的析构函数,它的主要目的是在销毁线程池时正确地关闭所有的工作线程。
首先,它使用std::unique_lock来获取queueMutex互斥锁,以确保在修改stop标志时不会有其他线程干扰。然后,它将stop标志设置为true,表示线程池需要停止运行。
接着,它调用condition.notify_all()来唤醒所有正在等待condition条件变量的线程。这些线程通常是工作线程,它们在等待新的任务出现在任务队列中。
最后,它遍历所有的工作线程,并调用join()方法来等待每个线程的结束。join()方法会阻塞当前线程,直到指定的线程结束执行。这样可以确保在销毁线程池时,所有的工作线程都已经正确地结束了。
总的来说,这段代码的目的是在销毁线程池时,正确地关闭所有的工作线程,并确保没有任何线程在运行。

在这里插入图片描述


原文地址:https://blog.csdn.net/qq_38662930/article/details/143040020

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!