自学内容网 自学内容网

从头开始:构建一个基于C/C++的线程池

线程池工作原理和实现

线程池工作原理

线程池(Thread Pool)是一种预先创建和管理一组工作线程的技术,用来优化并发任务的执行。通过复用这些线程来执行多个任务,线程池可以减少线程创建和销毁的开销,提高系统的性能和响应速度。

1. 线程池的基本组成:
  • 任务队列(Task Queue):存放待执行任务的队列。当有新的任务提交时,它会进入任务队列,等待可用线程来执行。
  • 工作线程(Worker Threads):线程池中的一组线程,用来处理任务队列中的任务。线程池启动时会创建一定数量的工作线程。
  • 线程池管理器(Thread Pool Manager):负责管理线程池的大小、任务分配、线程的创建和销毁。
2. 线程池的基本执行流程:
  1. 提交任务:用户向线程池提交任务,通常是实现了 RunnableCallable 接口的对象。任务被放入任务队列中。
  2. 任务分配:线程池中的某个空闲工作线程会从任务队列中取出一个任务,并开始执行。任务队列是线程安全的,确保多个线程可以同时取任务而不发生冲突。
  3. 任务执行:工作线程运行并执行任务中的代码。线程池中的工作线程是循环的,每个线程在完成一个任务后会继续取下一个任务。
  4. 复用线程:任务完成后,线程不会被销毁,而是回到线程池中成为“空闲线程”,等待下一个任务。
  5. 动态调整线程数量:如果任务数量激增,线程池可以根据策略(如扩展线程池的大小)创建新的线程来处理更多的任务;如果任务减少,线程池也可能会销毁一些线程以节省资源。
3. 线程池的核心参数:
  • 核心线程数(corePoolSize):线程池中保持活跃的核心线程数量,即使线程处于空闲状态也不会被回收。
  • 最大线程数(maximumPoolSize):线程池中允许的最大线程数量。当任务数量超过核心线程数时,线程池会根据需求创建更多的线程,直到达到最大线程数。
  • 任务队列(workQueue):用于存放等待执行的任务。如果当前所有线程都在忙碌,新的任务会被放入任务队列中。
  • 线程存活时间(keepAliveTime):线程池中超过核心线程数的空闲线程在等待新任务的最长等待时间,超过这个时间后将被销毁。
  • 线程工厂(ThreadFactory):用于创建新线程,方便自定义线程的属性,如线程的名称、优先级等。
  • 拒绝策略(RejectedExecutionHandler):当任务过多而无法处理时(任务队列满了且线程池的线程数已达到上限),线程池会执行拒绝策略,常见策略包括丢弃任务、抛出异常、或由调用者线程直接执行。
4. 线程池的生命周期:
  • 运行状态(RUNNING):线程池处于运行状态,可以接受任务并处理任务。
  • 关闭状态(SHUTDOWN):线程池不再接受新任务,但会继续处理已经提交的任务。
  • 停止状态(STOP):线程池不再接受任务,且会中断正在执行的任务。
  • 终止状态(TERMINATED):所有任务执行完毕,线程池中的线程全部销毁,线程池彻底关闭。
5. 线程池的执行策略:
  • 先使用核心线程:线程池优先利用核心线程来处理任务,只有在核心线程全部繁忙的情况下,才会将任务放入任务队列。
  • 任务队列满了,创建新线程:如果任务队列已满且所有核心线程都在工作,线程池会创建新的线程,直到达到 maximumPoolSize
  • 拒绝任务:如果任务队列满了,且线程池中的线程数量已经达到 maximumPoolSize,根据配置的拒绝策略处理新任务。

相关知识点

线程与进程的比较

  • 线程启动速度快,轻量级
  • 线程使用有一定难度,需要处理数据一致性问题
  • 同一线程共享的有堆、全局变量、静态变量、指针等,而独自占有栈
  • 线程是调度的基本单位(PC、状态码、通用寄存器、线程栈及栈指针);进程是拥有资源的基本单位(打开文件、堆、代码段等)
  • 一个进程内多个线程可以并发;多个进程可以并发
  • 拥有资源:线程不拥有系统资源,但一个进程的多个线程可以共享隶属进程的资源;进程是拥有资源的独立单位
  • 线程的系统开销小,线程创建销毁只需要处理PC值,状态码,通用寄存器值,线程栈及栈指针即可;进程创建和销毁需要重新分配及销毁task_struct结构。

读写锁

  • 多个读者可以同时进行读
  • 写者必须互斥(只允许一个写者写,也不能读者写者同时进行)
  • 写者优先于读者(一旦有写者,则后续读者必须等待,唤醒时优先考虑写者)

互斥锁

一次只能一个线程拥有互斥锁,其他线程只有等待。

互斥锁是在抢锁失败的情况下主动放弃CPU进入睡眠状态直到锁的状态改变时再唤醒,而操作系统负责线程调度,为了实现锁的状态发生改变时唤醒阻塞的线程或者进程,需要把锁交给操作系统管理,所以互斥锁在加锁操作时设计上下文的切换。互斥锁实际的效率还是可以让人接受的,加锁的时间大概100ns左右,而实际上互斥锁的一种可能的实现是先自旋一段时间,当自旋的时间超过阈值之后再将线程投入到睡眠中,因此在并发运算中使用互斥锁(每次占用锁的时间很短)的效果可能不亚于使用自旋锁。

互斥锁属于sleep-waiting类型的锁。例如在一个双核的机器上有两个线程A和B,它们分别运行在core 0和core 1上。假设线程A想要通过pthread_mutex_lock操作去得到一个临界区的锁,而此时这个锁正被线程B所持有,那么线程A就会被阻塞,此时会通过上下文切换将线程A置于等待队列中,此时core 0就可以运行其他的任务(如线程C)。

基于C语言的线程池设计与实现

任务队列

typedef struct Task
{
    void (*function) (void* arg); // void*是一个泛型,能够接收各种各样的数据类型
    void* arg;
}Task;

线程池定义

struct ThreadPool
{
    // 任务队列
    Task* taskQ; // 队列数组
    int queueCapacity;  // 容量
    int queueSize;      // 当前任务个数
    int queueFront;     // 队头,取数据
    int queueRear;      // 队尾,放数据

    pthread_t managerID;    // 管理者线程ID
    pthread_t *threadIDs;   // 工作的线程ID
    int minNum;             // 最小的线程数
    int maxNum;             // 最大的线程数
    int busyNum;            // 工作的线程个数
    int liveNum;            // 存活的线程个数
    int exitNum;            // 要杀死的线程个数
    pthread_mutex_t mutexpool;  // 锁住整个线程池
    pthread_mutex_t mutexBusy;  // 锁住busyNum变量
    pthread_cond_t notFull;     // 任务队列是否满了
    pthread_cond_t notEmpty;    // 任务队列是否空了

    int shutdown;               // 是否销毁线程池,销毁为1,不销毁为0
};

头文件声明

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);

// 销毁线程池
int threadPoolDestory(ThreadPool* pool);

// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);

void* worker(void* arg);

void* manager(void* arg);

void threadExit(ThreadPool* pool);

#endif 

源文件定义

#include "threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<stdio.h>

const int NUMBER = 2;

// 任务结构体
typedef struct Task
{   
    // void*是一个泛型,能够接收各种各样的数据类型
    void (*function) (void* arg); // 函数指针
    void* arg;
}Task;


// 线程池结构体
struct ThreadPool
{
    // 任务队列
    Task* taskQ; // 队列数组
    int queueCapacity;  // 容量
    int queueSize;      // 当前任务个数
    int queueFront;     // 队头,取数据
    int queueRear;      // 队尾,放数据

    pthread_t managerID;    // 管理者线程ID
    pthread_t *threadIDs;   // 工作的线程ID
    int minNum;             // 最小的线程数
    int maxNum;             // 最大的线程数
    int busyNum;            // 工作的线程个数
    int liveNum;            // 存活的线程个数
    int exitNum;            // 要杀死的线程个数
    pthread_mutex_t mutexpool;  // 锁住整个线程池
    pthread_mutex_t mutexBusy;  // 锁住busyNum变量
    pthread_cond_t notFull;     // 任务队列是否满了,用于阻塞生产者
    pthread_cond_t notEmpty;    // 任务队列是否空了,用于阻塞消费者

    int shutdown;               // 是否销毁线程池,销毁为1,不销毁为0
};

ThreadPool* threadPoolCreate(int min, int max, int queueSize) {
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    do {
        if(pool == NULL) {
            printf("malloc threadpool fail....\n");
            break;
        }
        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
        if(pool->threadIDs == NULL) {
            printf("malloc threadIDs fail....\n");
            break;
        }
        memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
        pool->minNum = min;
        pool->maxNum = max;
        pool->busyNum = 0;
        pool->liveNum = min; // 和最小个数相等
        pool->exitNum = 0;

        if(pthread_mutex_init(&pool->mutexpool, NULL) != 0 ||
        pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
        pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
        pthread_cond_init(&pool->notFull, NULL) != 0
        ) {
            printf("mutex or condif init fail....\n");
            break;
        }

        // 任务队列
        pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueRear = 0;

        pool->shutdown = 0;

        // 创建管理者线程和工作者线程
        pthread_create(&pool->managerID, NULL, manager, pool); // 第三个参数为管理者线程的任务函数
        for(int i = 0; i < min; i++) {
            pthread_create(&pool->threadIDs[i], NULL, worker, pool); // 第三个参数为工作的线程的任务函数
        }
        return pool;
    } while(0);

    // 释放资源
    if(pool->threadIDs) free(pool->threadIDs);
    if(pool->taskQ) free(pool->taskQ);
    if(pool) free(pool);

    return NULL;
}

int threadPoolDestory(ThreadPool* pool) {
    if(pool == NULL) {
        return -1;
    }

    // 关闭线程池
    pool->shutdown = 1;
    // 阻塞回收管理者线程
    pthread_join(pool->managerID, NULL);
    // 唤醒阻塞的消费者线程
    for(int i = 0; i < pool->liveNum; i++) {
        pthread_cond_signal(&pool->notEmpty);
    }
    // 释放堆内存
    if(pool->taskQ) {
        free(pool->taskQ);
    }
    if(pool->threadIDs) {
        free(pool->threadIDs);
    }
    
    pthread_mutex_destroy(&pool->mutexpool);
    pthread_mutex_destroy(&pool->mutexBusy);
    pthread_cond_destroy(&pool->notEmpty);
    pthread_cond_destroy(&pool->notFull);
    free(pool);
    pool = NULL;
    return 0;
}

void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg) {
    pthread_mutex_lock(&pool->mutexpool);
    while(pool->queueSize == pool->queueCapacity && !pool->shutdown) {
        // 阻塞生产者线程
        pthread_cond_wait(&pool->notFull, &pool->mutexpool);
    }
    if(pool->shutdown) {
        pthread_mutex_unlock(&pool->mutexpool);
        return;
    }
    // 添加任务
    pool->taskQ[pool->queueRear].function = func;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;

    pthread_cond_signal(&pool->notEmpty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者
    pthread_mutex_unlock(&pool->mutexpool);
}

int threadPoolBusyNum(ThreadPool* pool) {
    pthread_mutex_lock(&pool->mutexBusy);
    int busyNum = pool->busyNum;
    pthread_mutex_unlock(&pool->mutexBusy);
    return busyNum;
}

int threadPoolAliveNum(ThreadPool* pool) {
    pthread_mutex_lock(&pool->mutexpool);
    int liveNum = pool->liveNum;
    pthread_mutex_unlock(&pool->mutexpool);
    return liveNum;
}

void* worker(void* arg) {
    ThreadPool* pool = (ThreadPool*)arg;
    while(1) {
        pthread_mutex_lock(&pool->mutexpool); // 访问线程池之前加锁
        // 当前任务队列是否为空
        while(pool->queueSize == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭
            // 阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexpool);

            // 判断是不是要销毁线程
            if(pool->exitNum > 0) {
                pool->exitNum--;
                if(pool->liveNum > pool->minNum) {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexpool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁
                    threadExit(pool);
                }
            }
        }

        // 判断线程池是否被关闭了
        if(pool->shutdown) {
            pthread_mutex_unlock(&pool->mutexpool); // 避免死锁
            threadExit(pool);
        }

        // 从任务队列中取出一个任务
        Task task;
        task.function = pool->taskQ[pool->queueFront].function;
        task.arg = pool->taskQ[pool->queueFront].arg;
        // 移动头结点
        pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; // 循环队列
        pool->queueSize--;

        pthread_cond_signal(&pool->notFull); // 消费者消费完产品后唤醒生产者
        pthread_mutex_unlock(&pool->mutexpool); // 用完之后解锁

        printf("thread %ld start working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum++;
        pthread_mutex_unlock(&pool->mutexBusy);
        task.function(task.arg);
        free(task.arg);
        task.arg = NULL;

        printf("thread %ld end working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}

void* manager(void* arg) {
    ThreadPool* pool = (ThreadPool*)arg;
    while(!pool->shutdown) {
        // 每隔三秒钟检测一次
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量
        pthread_mutex_lock(&pool->mutexpool);
        int queueSize = pool->queueSize;
        int liveNumber = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexpool);

        // 取出忙的线程的数量
        pthread_mutex_lock(&pool->mutexBusy);
        int busyNumber = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);

        // 添加线程
        // 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数
        if(queueSize > liveNumber && liveNumber < pool->maxNum) {
            pthread_mutex_lock(&pool->mutexpool);
            int count = 0;
            for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {
                if(pool->threadIDs[i] == 0) {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    count++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexpool);
        }

        // 销毁线程
        // 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数
        // 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写
        if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {
            pthread_mutex_lock(&pool->mutexpool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexpool);
            // 让工作的线程自杀
            for(int i = 0; i < NUMBER; i++) {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
    }
}

void threadExit(ThreadPool* pool) {
    pthread_t tid = pthread_self(); // 获得线程自身的ID。
    for(int i = 0; i < pool->maxNum; i++) {
        if(pool->threadIDs[i] == tid) { // tid对应的线程要退出了
            pool->threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

测试代码

#include<stdio.h>
#include "threadpool.h"
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>

void taskFunc(void* arg) {
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n", pthread_self(), num);
    sleep(1);
}

int main() {
    // 创建线程池
    ThreadPool* pool = threadPoolCreate(3, 10, 100);
    for(int i = 0; i < 100; i++) {
        int* num = (int*)malloc(sizeof(int));
        *num = i + 100;
        threadPoolAdd(pool, taskFunc, num);
    }

    sleep(30); // 等待子线程把任务处理完毕

    threadPoolDestory(pool);
    return 0;
}

image-20240922170244684

基于C++的线程池设计与实现

因为在C++中,delete task.arg时候,由于delete void*类型是有危险的,因为void*指针只占四个字节,因此有可能不能全部地被释放,为了知道在程序中void*实际上是什么类型,因此在C++中可以使用模板来解决这一问题,因为模板可以传递类型。

任务队列声明

#pragma once
#include<queue>
#include<pthread.h>

using callback = void (*) (void* arg);

// 任务结构体
template <typename T>
struct Task
{   
    Task<T>() {
        function = nullptr;
        arg = nullptr;
    }
    Task<T>(callback f, void* arg) {
        this->arg = (T*)arg;
        function = f;
    }
    callback function; // 函数指针
    T* arg;
};

template <typename T>
class TaskQueue
{
private:
    std::queue <Task<T>> m_taskQ;
    pthread_mutex_t m_mutex; // 互斥锁
public:
    TaskQueue(/* args */);
    ~TaskQueue();

    // 添加任务
    void addTask(Task<T> task);
    void addTask(callback f, void* arg);
    // 取出一个任务
    Task<T> takeTask();
    // 获取当前任务的个数
    inline size_t taskNumber() { // 没有if判断什么的,比较简单的直接写成内联函数比较好
        return m_taskQ.size();
    }
};

任务队列定义

#include "TaskQueue.h"


template <typename T>
TaskQueue<T>::TaskQueue(/* args */)
{
    pthread_mutex_init(&m_mutex, NULL);
}

template <typename T>
TaskQueue<T>::~TaskQueue()
{
    pthread_mutex_destroy(&m_mutex);
}

template <typename T>
void TaskQueue<T>::addTask(Task<T> task)
{
    pthread_mutex_lock(&m_mutex);
    m_taskQ.push(task);
    pthread_mutex_unlock(&m_mutex);
}

template <typename T>
void TaskQueue<T>::addTask(callback f, void* arg)
{
    pthread_mutex_lock(&m_mutex);
    m_taskQ.push(Task<T>(f, arg));
    pthread_mutex_unlock(&m_mutex);
}

template <typename T>
Task<T> TaskQueue<T>::takeTask() 
{
    Task<T> task;
    pthread_mutex_lock(&m_mutex);
    if(!m_taskQ.empty()) {
        task = m_taskQ.front();
        m_taskQ.pop();
    }
    pthread_mutex_unlock(&m_mutex);
    return task;
}

线程池声明

#pragma once
#include "TaskQueue.h"
#include "TaskQueue.cpp"

template <typename T>
class ThreadPool
{
public:
    ThreadPool(int min, int max);
    ~ThreadPool();

    // 添加任务
    void addTask(Task<T> task);
    // 获取忙线程的个数
    int getBusyNumber();
    // 获取活着的线程个数
    int getAliveNumber();

private:
    // 工作的线程的任务函数
    static void* worker(void* arg);
    // 管理者线程的任务函数
    static void* manager(void* arg);
    void threadExit();

    static const int NUMBER = 2;

    pthread_mutex_t mutexPool;
    pthread_cond_t notEmpty;
    pthread_t* threadIDs;
    pthread_t managerID;
    TaskQueue<T>* taskQ;
    int minNum;
    int maxNum;
    int busyNum;
    int liveNum;
    int exitNum;
    bool shutdown = false;
};

线程池定义

#include "ThreadPool.h"
#include <iostream>
#include <string.h>
#include <string>
#include <unistd.h>

using namespace std;

template <typename T>
ThreadPool<T>::ThreadPool(int min, int max) {
    // 实例化任务队列
    do {
        taskQ = new TaskQueue<T>;
        if(taskQ == nullptr) {
            cout << "malloc threadIDs fail...\n";
            break;
        }

        threadIDs = new pthread_t[max];
        if(threadIDs == nullptr) {
            cout << "malloc threadIDs fail....\n";
            break;
        }
        memset(threadIDs, 0, sizeof(pthread_t) * max);
        minNum = min;
        maxNum = max;
        busyNum = 0;
        liveNum = min; // 和最小个数相等
        exitNum = 0;

        if(pthread_mutex_init(&mutexPool, NULL) != 0 ||
        pthread_cond_init(&notEmpty, NULL) != 0
        ) {
            cout << "mutex or condif init fail....\n";
            break;
        }

        shutdown = false;
        
        /*
        为什么要把this传给manager呢?
        因为manager是一个静态方法,静态方法它只能访问类里边的静态成员变量,
        它不能访问类的非静态成员变量。
        因此如果我们想要访问这些非静态成员变量,就必须要给这个静态方法传进去一个
        实例化对象,通过传进去的这个实例化对象来访问里边的非静态成员函数和变量
        */
        // 创建管理者线程和工作者线程
        pthread_create(&managerID, NULL, manager, this); // 第三个参数为管理者线程的任务函数
        for(int i = 0; i < min; i++) {
            pthread_create(&threadIDs[i], NULL, worker, this); // 第三个参数为工作的线程的任务函数
        }
        return;
    } while(0);

    // 释放资源
    if(threadIDs) delete []threadIDs;
    if(taskQ) delete taskQ;
}

template <typename T>
ThreadPool<T>::~ThreadPool() {

    // 关闭线程池
    shutdown = true;
    // 阻塞回收管理者线程
    pthread_join(managerID, NULL);
    // 唤醒阻塞的消费者线程
    for(int i = 0; i < liveNum; i++) { // 因为到了最后没有任务了,所以活着的线程都需要关闭
        pthread_cond_signal(&notEmpty);
    }
    // 释放堆内存
    if(taskQ) {
        delete taskQ;
    }
    if(threadIDs) {
        delete []threadIDs;
    }
    
    pthread_mutex_destroy(&mutexPool);
    pthread_cond_destroy(&notEmpty);
}

template <typename T>
void ThreadPool<T>::addTask(Task<T> task) {
    if(shutdown) {
        return;
    }
    // 添加任务
    taskQ->addTask(task);

    pthread_cond_signal(&notEmpty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者
}

template <typename T>
int ThreadPool<T>::getBusyNumber() {
    pthread_mutex_lock(&mutexPool);
    int busyNum = this->busyNum;
    pthread_mutex_unlock(&mutexPool);
    return busyNum;
}

template <typename T>
int ThreadPool<T>::getAliveNumber() {
    pthread_mutex_lock(&mutexPool);
    int liveNum = this->liveNum;
    pthread_mutex_unlock(&mutexPool);
    return liveNum;
}

template <typename T>
void* ThreadPool<T>::worker(void* arg) {
    ThreadPool* pool = static_cast<ThreadPool*>(arg); // static_cast相当于c里面的强制类型转换
    while(1) {
        pthread_mutex_lock(&pool->mutexPool); // 访问线程池之前加锁
        // 当前任务队列是否为空
        while(pool->taskQ->taskNumber() == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭
            // 阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

            // 判断是不是要销毁线程
            if(pool->exitNum > 0) {
                pool->exitNum--;
                if(pool->liveNum > pool->minNum) {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexPool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁
                    pool->threadExit();
                }
            }
        }

        // 判断线程池是否被关闭了
        if(pool->shutdown) {
            pthread_mutex_unlock(&pool->mutexPool); // 避免死锁
            pool->threadExit();
        }

        // 从任务队列中取出一个任务
        Task<T> task = pool->taskQ->takeTask();

        pool->busyNum++;
        // 因为任务队列可以无限大,所以生产者可以不用唤醒
        pthread_mutex_unlock(&pool->mutexPool); // 用完之后解锁

        printf("thread %ld start working...\n", pthread_self());
        task.function(task.arg);
        delete task.arg; // 
        task.arg = nullptr;

        printf("thread %ld end working...\n", pthread_self());

        pthread_mutex_lock(&pool->mutexPool);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexPool);
    }
    return NULL;
}

template <typename T>
void* ThreadPool<T>::manager(void* arg) {
    ThreadPool* pool = static_cast<ThreadPool*>(arg);
    while(!pool->shutdown) {
        // 每隔三秒钟检测一次
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量
        pthread_mutex_lock(&pool->mutexPool);
        int queueSize = pool->taskQ->taskNumber();
        int liveNumber = pool->liveNum;
        int busyNumber = pool->busyNum; // 取出忙的线程的数量
        pthread_mutex_unlock(&pool->mutexPool);
        

        // 添加线程
        // 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数
        if(queueSize > liveNumber && liveNumber < pool->maxNum) {
            pthread_mutex_lock(&pool->mutexPool);
            int count = 0;
            for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {
                if(pool->threadIDs[i] == 0) {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    count++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexPool);
        }

        // 销毁线程
        // 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数
        // 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写
        if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {
            pthread_mutex_lock(&pool->mutexPool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexPool);
            // 让工作的线程自杀
            for(int i = 0; i < NUMBER; i++) {
                // 唤醒已经被阻塞的工作的线程,因为工作的线程无事可做,肯定是被阻塞在条件变量中
                pthread_cond_signal(&pool->notEmpty); 
            }
        }
    }
}

template <typename T>
void ThreadPool<T>::threadExit() {
    pthread_t tid = pthread_self(); // 获得当前线程的线程ID。
    for(int i = 0; i < maxNum; i++) {
        if(threadIDs[i] == tid) { // tid对应的线程要退出了
            threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

测试

#include "ThreadPool.h"
#include "ThreadPool.cpp"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

void taskFunc(void* arg) {
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n", pthread_self(), num);
    sleep(1);
}

int main() {
    // 创建线程池
    ThreadPool<int> pool(3, 10);
    for(int i = 0; i < 100; i++) {
        int* num = new int(i + 100);
        pool.addTask(Task<int>(taskFunc, num));
    }

    sleep(20); // 等待子线程把任务处理完毕

    return 0;
}

g++ main.cpp -o main -lpthread

image-20240923173739851


原文地址:https://blog.csdn.net/liupang14159/article/details/142490119

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!