自学内容网 自学内容网

C++并发编程实战—单例模式与线程池实现

线程池

C++线程池是一种用于管理和复用线程的机制,它可以提高程序的性能和效率,特别是在处理大量并发任务时。以下是C++线程池的具体细节:

一、定义与功能

  • 定义:线程池是一种设计模式,它预先创建并维护一定数量的线程,这些线程可以重复执行多个任务。当有任务需要执行时,线程池会选择一个可用的线程来执行任务,任务执行完毕后,线程会返回线程池,等待下一个任务的到来。
  • 功能
    • 降低线程创建和销毁的开销:线程的创建和销毁是比较耗费资源的操作,使用线程池可以避免频繁地创建和销毁线程,从而提高程序的性能。
    • 提高系统的响应速度:线程池中的线程可以立即执行任务,而不需要等待线程的创建和启动时间。
    • 控制并发线程数:线程池可以限制同时执行的线程数量,避免系统资源被过度占用,提高系统的稳定性。
    • 提供线程的管理和监控机制:线程池可以统一管理线程的状态、生命周期和执行情况,方便监控和调试。

二、实现原理

  • 创建线程:线程池在初始化时会创建一组线程,这些线程一般会一直存在并处于等待状态,以等待任务的到来。
  • 任务队列:线程池会维护一个任务队列,用于存储需要执行的任务。任务队列通常是一个先进先出的数据结构,如队列(queue)。
  • 任务分发:当有任务需要执行时,线程池会将任务添加到任务队列中。空闲的线程会从任务队列中取出任务并执行。执行完任务后,线程会再次进入等待状态,直到有新的任务到来。
  • 线程管理:线程池会在任务执行完毕后,重新将线程放回线程池中,以便下次使用。线程池还会管理线程的生命周期,包括线程的创建、启动、停止和销毁等。

三、关键组件

  • 线程池类:通常包含一个构造函数用于初始化线程池(包括创建线程和任务队列),一个析构函数用于销毁线程池并清理资源,以及添加任务、启动线程池、停止线程池等成员函数。
  • 任务队列:用于存储待执行的任务。线程池中的线程会从任务队列中取出任务并执行。任务队列通常是一个线程安全的队列,以确保多个线程可以安全地访问和修改它。
  • 线程数组:用于存储线程池中的线程。这些线程在创建后会一直存在,并等待任务的到来。线程数组的大小通常是根据系统的硬件环境和应用的需求来配置的。
  • 同步机制:用于确保线程池中的线程可以安全地访问和修改共享资源(如任务队列)。通常使用互斥锁(mutex)和条件变量(condition variable)等同步机制来实现。

四、使用场景与注意事项

  • 使用场景:线程池适用于需要处理大量并发任务的应用程序,如Web服务器、数据库连接池、图像处理等。
  • 注意事项
    • 需要合理配置线程池的大小和任务队列的大小等参数,以达到最佳的性能和资源利用率。
    • 如果线程池中的线程长时间闲置而不被使用,可能会导致资源的浪费和泄露。因此,需要定期检查和清理线程池中的线程。
    • 在使用线程池时,如果任务之间存在依赖关系,可能会引发死锁问题。需要额外的注意和处理来避免死锁的发生。

五、代码实现

#include <iostream>
#include <thread>
#include <functional>
#include <vector>
#include <queue>
#include <string>
#include <condition_variable>
#include <mutex>
#include <chrono>

class ThreadPool {
private:
    std::vector<std::thread> threads;//线程数组
    std::queue<std::function<void()>> tasks;//任务队列
    std::mutex mtx;//互斥锁
    std::condition_variable condition;//条件变量
    bool Terminate;//线程池是否终止

    //构造函数
    ThreadPool(int numThreads) : Terminate(false) {
        for (int i = 0; i < numThreads; i++) {
            //创建线程以及回调函数
            threads.emplace_back([this]() {
                while (1) {
                    if (Terminate && tasks.empty()) {
                        return;
                    }
                    std::unique_lock<std::mutex> lock(mtx);//加锁
                    condition.wait(lock, [&]() {
                        return !tasks.empty();
                    });//等待条件变量

                    //取出任务
                    auto task(std::move(tasks.front()));//移动语义:右值引用
                    tasks.pop();
                    lock.unlock();//解锁
                    task();//执行任务
                }
            });
        }
    }

    //禁用拷贝构造函数和赋值运算符
    ThreadPool(const ThreadPool &)=delete;
    ThreadPool operator=(const ThreadPool&)=delete;

    //创建或获取静态实例对象
    static ThreadPool& getInstanceHelper(int numThreads){
        static ThreadPool instance(numThreads);//注意:静态局部变量不管调用多次,其只会初始化一次
        return instance;
    }

public:
    //析构函数
    ~ThreadPool(){
        {
            std::unique_lock<std::mutex> lock(mtx);//加锁
            Terminate = true;//终止线程池
        }
        condition.notify_all();//唤醒所有线程,完成全部任务
        //等待所有线程结束
        for (auto &thread:threads) {
            thread.join();//汇入主线程
        }
    }

    //添加任务
    template<class T,class...Args>
    void enqueue(T &&f,Args &&...args){
        std::function<void()> task=std::bind(std::forward<T>(f),std::forward<Args>(args)...);//绑定任务
        {
            std::unique_lock<std::mutex> lock(mtx);//加锁
            tasks.emplace(std::move(task));//添加任务
        }
        condition.notify_one();//唤醒一个线程
    }

    //获取线程池实例
    static ThreadPool& getInstance(int numThreads){
        return getInstanceHelper(numThreads);
    }
};

int main(){
    ThreadPool& pool=ThreadPool::getInstance(4);//创建线程池
    //添加任务
    for (int i = 1; i <= 10; ++i) {
        pool.enqueue([=](){
            std::cout<<"任务:"<<i<<"正在运行"<<std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));//模拟任务执行时间
            std::cout<<"任务:"<<i<<"运行完毕"<<std::endl;
        });
    }
    return 0;
}

这段代码实现了一个简单的线程池 ThreadPool 类,它允许用户创建指定数量的线程,并将任务(std::function<void()> 类型的函数对象)添加到线程池中,由线程池中的线程异步执行。以下是对代码的主要部分的解释和一些潜在问题的讨论:

1.构造函数

  • 构造函数接受一个整数 numThreads,表示要创建的线程数量。
  • 使用一个 std::queue<std::function<void()>> 来存储待执行的任务。
  • 使用 std::mutexstd::condition_variable 来同步对任务队列的访问,并确保当有新任务添加到队列时,能够唤醒一个等待中的线程。
  • 每个线程在一个无限循环中运行,检查是否有任务要执行。如果 Terminate 标志被设置为 true 且任务队列为空,线程将退出循环并结束。

2.线程池实例管理

  • 通过一个私有的静态成员函数 getInstanceHelper 和一个公共的静态成员函数 getInstance 来实现单例模式,确保整个程序中只有一个 ThreadPool 实例。
  • getInstanceHelper 函数利用局部静态变量的特性来确保线程安全的延迟初始化。

3.添加任务

  • enqueue 成员函数模板允许用户将任何可调用对象(函数、lambda 表达式、绑定表达式等)作为任务添加到线程池中。
  • 使用 std::bindstd::forward 来完美转发参数,确保任务可以被正确地绑定和存储。
  • 每次添加任务后,使用 condition.notify_one() 来唤醒一个等待中的线程。

4.析构函数

  • 在析构函数中,首先将 Terminate 标志设置为 true,然后通知所有等待中的线程。
  • 等待所有线程执行完当前任务后结束(通过调用 thread.join())。

5.潜在问题和改进

  1. 任务顺序和并发性:由于任务是并发执行的,因此任务的执行顺序是不确定的。如果需要按顺序执行任务,需要额外的同步机制。
  2. 异常处理:当前代码中没有处理任务执行过程中可能抛出的异常。在实际应用中,应该添加适当的异常处理逻辑,以确保线程池的稳定性和健壮性。
  3. 性能优化:对于高并发场景,可以考虑使用无锁数据结构或其他并发技术来减少锁竞争,提高性能。
  4. 资源清理:在析构函数中,如果某个线程正在执行任务时抛出异常,可能会导致 thread.join() 调用失败。应该添加适当的错误处理逻辑来确保资源能够被正确清理。
  5. 任务取消:当前线程池不支持任务取消功能。如果需要,可以添加任务标识符和取消机制。

6.主函数

  • 主函数中,创建了一个包含 4 个线程的线程池,并添加了 10 个任务。每个任务打印一条消息,休眠一秒,然后再次打印一条消息,模拟任务的执行。

这段代码是一个很好的线程池实现示例,展示了如何使用 C++11 线程库来构建并发应用程序。然而,在实际应用中,还需要考虑异常处理、性能优化和资源清理等额外因素。


原文地址:https://blog.csdn.net/weixin_45706195/article/details/142704540

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!