Linux线程(整理)
目录
静态的成员不能访问类内的成员变量或者成员函数(可以访问静态的属性或者函数可以)
这就是生产者消费者模型(进程池也类似:管道不就是任务队列吗:管道自带同步和互斥)
pthread(线程的介绍)
线程哪部分资源是独占的:线程上下文和栈
所有线程都是来自于一个进程
线程创建pthread_create
pthread_create()函数:创建线程 - C语言中文网 (biancheng.net)
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
参数说明
pthread_t *thread
:指向pthread_t
类型的指针,用于存储新创建线程的标识符,thread是返回线程ID。const pthread_attr_t *attr
:指向pthread_attr_t
类型的指针,用于指定线程的属性。通常设置为NULL
,表示使用默认的线程属性,attr:设置线程的属性,attr为NULL表示使用默认属性。void *(*start_routine) (void *)
:指向线程启动例程的函数指针,该函数将在新线程中执行,start_routine:是一个函数地址,线程启动后要执行的函数。void *arg
:传递给启动例程的参数的指针,该参数类型为void,
arg:传给线程启动函数的参数
。
返回值
0
:成功创建线程。- 非
0
:创建线程失败。
总结
pthread_create
用于创建一个新线程,该线程将执行指定的启动例程。- 新线程创建后,它将在其独立的执行路径上运行
start_routine
函数。 pthread_t
类型用于标识线程,它是一个无符号整数。- 创建线程时,可以通过
pthread_attr_t
指定线程的属性,例如线程的堆栈大小、调度策略等。默认属性可以使用pthread_attr_init()
初始化。 start_routine
函数可以接收一个void *
类型的参数,通常用于传递数据给新线程。pthread_create
函数是创建多线程编程的基础,它允许程序并行执行多个任务。
注意事项
- 线程的创建和同步需要谨慎处理,以避免死锁、竞争条件和其他并发问题。
- 在使用
pthread_create
创建线程后,主线程通常会等待新线程完成工作,可以使用pthread_join
或pthread_detach
实现这一点。 - 线程退出时,其堆栈应该被清理,以避免内存泄漏。
示例
以下是一个使用 pthread_create
创建线程的简单示例:
#include <pthread.h>
#include <stdio.h>
void *print_message_function(void *ptr) {
const char *message;
message = (const char *)ptr;
printf("%s\n", message);
return NULL;
}
int main(void) {
pthread_t thread_id;
const char *message = "Hello from thread";
// 创建新线程
pthread_create(&thread_id, NULL, print_message_function, (void *)message);
// 等待线程结束
pthread_join(thread_id, NULL);
return 0;
}
在这个例子中,print_message_function
是线程的启动例程,它接收一个指向字符串的指针并打印它。主线程调用 pthread_create
创建新线程,然后使用 pthread_join
等待它完成。
c语言中不能用void定义变量
void*可以,为了方便接收任意指针类型,与类型无关
void* 8个字节(64位)
参数介绍
threadRoutine中必须保证是void *
两个执行流
#include<iostream>
#include<unistd.h>
#include<pthread.h>
using namespace std;
//new thread
void* threadRoutine(void* args)
{
while(true)
{
cout<< "new thread,pid:" << getpid() <<endl;
sleep(2);
}
}
int main()
{
//主线程
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, nullptr);
cout<< sizeof(void*)<<endl;
cout<< sizeof(void)<<endl;
while(true)
{
cout<< "main thread,pid: " <<getpid() <<endl;
sleep(1);
}
return 0;
}
编译后
链接式报错
不是系统调用
g++找到自家的库,而pthread_create的库找不到
第三方库:要这样
单执行流不可能执行两个双循环
两个执行流,一个进程
两个轻量级进程(线程就是轻量级进程,没有特定的含义)
pid=lwp是主线程
不相等的那个说明是新创建出来的
任何一个线程被干掉了(kill lwp的),默认整个进程都会被干掉
main线程和new线程都可以调用show
show可以被多个执行流同时执行,show函数被重入了!!
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<string>
using namespace std;
void show(const string&name)
{
cout<< name << "hello thread" << endl;
}
//new thread
void* threadRoutine(void* args)
{
while(true)
{
//cout<< "new thread,pid:" << getpid() <<endl;
show("new thread");
sleep(2);
}
}
int main()
{
//主线程
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, nullptr);
while(true)
{
//cout<< "main thread,pid: " <<getpid() <<endl;
show("main thread");
sleep(1);
}
return 0;
}
g_val,main中变new中也变,地址也一样
查看线程的脚本
while :;do ps -aL | head -1&&ps -aL | grep mythread; sleep 1;done
模拟一下\0错误
//new thread
void* threadRoutine(void* args)
{
while(true)
{
printf("new thread pid:%d,g_val:%d,&g_val:0x%p\n",getpid(),g_val,&g_val);
//cout<< "new thread,pid:" << getpid() <<endl;
//show("new thread");
sleep(10);
int a = 10;
a /= 0;
}
}
int main()
{
//主线程
pthread_t tid;
pthread_create(&tid, nullptr, threadRoutine, nullptr);
while(true)
{
printf("main thread pid:%d,g_val:%d,&g_val:0x%p\n",getpid(),g_val,&g_val);
//cout<< "main thread,pid: " <<getpid() <<endl;
//show("main thread");
sleep(1);
g_val++;
}
return 0;
}
测试结果
10秒后\0错误出现,都会崩溃
第一个参数
tid是一个很大的数字
第四个参数
要符合这里的标注void*,线程里的函数一定进行强转成想打印的类型
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
//new thread
void* threadRoutine(void* args)
{
const char* name = (const char*)args;
while(true)
{
printf("%s new thread pid:%d,g_val:%d,&g_val:0x%p\n",name, getpid(),g_val,&g_val);
//cout<< "new thread,pid:" << getpid() <<endl;
//show("new thread");
sleep(10);
int a = 10;
a /= 0;
}
}
int main()
{
//主线程
pthread_t tid;
//pthread_create(&tid, nullptr, threadRoutine, nullptr);
pthread_create(&tid, nullptr, threadRoutine, (void*)("ljwljw"));
while(true)
{
printf("main thread pid:%d,g_val:%d,&g_val:0x%p\n",getpid(),g_val,&g_val);
//cout<< "main thread,pid: " <<getpid() <<endl;
//show("main thread");
sleep(1);
g_val++;
}
return 0;
}
进程谁先运行?
主线程要最后退
线程的等待函数pthread_join
pthread_join()函数:等待线程执行结束 - C语言中文网 (biancheng.net)
用法:
pthread_join
是 POSIX 线程库(pthread)中的一个函数,用于一个线程等待另一个线程完成执行。在多线程编程中,pthread_join
是同步线程的常用方法之一。
介绍
在 C 语言的多线程编程中,线程可以创建为守护线程(daemon thread)或非守护线程。守护线程在创建后不会阻塞其父线程的终止,而非守护线程则会在其终止时阻塞父线程的终止。
pthread_join
函数允许父线程等待其子线程结束。这意味着,直到子线程完成执行,父线程才会继续执行。
用法总结
以下是如何使用 pthread_join
的步骤和示例:
1. 包含头文件
#include <pthread.h>
2. 定义线程函数
创建一个线程函数,该函数将在新线程中执行。
void* thread_function(void* arg) {
// 执行线程任务
return NULL;
}
3. 创建线程
使用 pthread_create
函数创建线程。
pthread_t thread_id;
int rc = pthread_create(&thread_id, NULL, thread_function, NULL);
if (rc) {
// 错误处理
perror("pthread_create");
return 1;
}
4. 等待线程结束
使用 pthread_join
函数等待线程结束。
void* status;
rc = pthread_join(thread_id, &status);
if (rc) {
// 错误处理
perror("pthread_join");
return 1;
}
示例代码
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
void* thread_function(void* arg) {
printf("Hello from thread!\n");
return NULL;
}
int main() {
pthread_t thread_id;
int rc;
rc = pthread_create(&thread_id, NULL, thread_function, NULL);
if (rc) {
perror("pthread_create");
return 1;
}
printf("Waiting for thread to finish...\n");
pthread_join(thread_id, NULL);
printf("Thread finished!\n");
return 0;
}
注意事项
- 当父线程调用
pthread_join
时,它将阻塞,直到指定的线程完成执行。 - 如果父线程在子线程之前结束,子线程将自动转换成守护线程,它将在父线程终止后立即结束。
- 如果在调用
pthread_join
之前,父线程和子线程已经分离(即父线程已经继续执行),则pthread_join
将返回EDEADLK
错误。
使用 pthread_join
可以确保所有子线程都已完成执行,这对于正确管理线程和避免数据竞争等问题非常重要。
例子:
main线程等待new线程退出后自己也退出了(可以保证主线程最后退出了)
测试结果
要想通过pthread_join拿到一个新线程退出时的退出结果,那么我们需要使用叫做二级指针
//new thread
void* threadRoutine(void* args)
{
const char* name = (const char*)args;
int cnt = 5;
while(true)
{
printf("%s new thread pid:%d,g_val:%d,&g_val:0x%p\n",name, getpid(),g_val,&g_val);
//cout<< "new thread,pid:" << getpid() <<endl;
//show("new thread");
sleep(1);
// int a = 10;
// a /= 0;
cnt--;
if(cnt == 0) break;
}
return (void*)1;//走到这默认线程退出了
}
int main()
{
//主线程
pthread_t tid;
//pthread_create(&tid, nullptr, threadRoutine, nullptr);
pthread_create(&tid, nullptr, threadRoutine, (void*)("ljwljw"));
// while(true)
// {
// printf("main thread pid:%d,g_val:%d,&g_val:0x%p\n",getpid(),g_val,&g_val);
// //cout<< "main thread,pid: " <<getpid() <<endl;
// //show("main thread");
// sleep(1);
// g_val++;
// }
//sleep(7);
void* retval;
pthread_join(tid, &retval);
cout<< "main thread quit ...,ret:" << (long long int)retval <<endl;
return 0;
}
int4个字节,void*8个字节发生截断了,要用long long int
pthread_exit仅代表线程终止
exit不能用来终止线程,直接调用exit,exit是用来终止进程的!!不能用来直接终止线程
phread_cancel取消某一个线程
新线程创建一秒后就被取消了,取消后的返回值是-1了
写一个线程做数据计算
大致思路
传类对象
这是结果类对象
#include<iostream>
#include<pthread.h>
#include<string>
using namespace std;
class Request
{
public:
Request(int start, int end, const string& threadname)
:start_(start), end_(end), threadname_(threadname)
{}
public:
int start_;
int end_;
string threadname_;
};
class Response
{
public:
Response(int result, int exitcode)
:result_(result), exitcode_(exitcode)
{}
public:
int result_; //计算结果
int exitcode_; //计算结果是否可靠
};
void *sunCount(void *args)
{
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, sunCount, nullptr);
pthread_join(tid, nullptr);
return 0;
}
创建完线程后,这样传就可以了
在线程执行函数里进行强转
构造一个request对象,名字叫thread 1
在等待函数中拿出结果,在这里接收
#include<iostream>
#include<pthread.h>
#include<string>
#include<unistd.h>
using namespace std;
class Request
{
public:
Request(int start, int end, const string& threadname)
:start_(start), end_(end), threadname_(threadname)
{}
public:
int start_;
int end_;
string threadname_;
};
class Response
{
public:
Response(int result, int exitcode)
:result_(result), exitcode_(exitcode)
{}
public:
int result_; //计算结果
int exitcode_; //计算结果是否可靠
};
void *sunCount(void *args)
{
Request* rq = static_cast<Request*>(args);
Response* rsp = new Response(0,0);
for(int i = rq->start_; i < rq->end_; i++)
{
rsp->result_ +=i;
usleep(100000);
}
//记得释放
delete rq;
return rsp;
}
int main()
{
pthread_t tid;
Request* rq = new Request(1, 100, "thread 1");
pthread_create(&tid, nullptr, sunCount, rq);
void* ret;//线程结束后,在等待这接收结果
pthread_join(tid, &ret);//&ret是二级指针
Response* rsp = static_cast<Response*>(ret);
cout<< "rsp->result:" << rsp->result_ << " , exitcode: " << rsp->exitcode_ <<endl;
//记得释放
delete rsp;
return 0;
}
堆空间也是线程共享的
测试
语言本身多线程多线程
目前,我们的原生线程,pthread库,原生线程库
C++11语言本身也已经支持多线程了多线程
c++线程
头文件
#include <thread>
在 C++ 中,线程的创建和管理主要依赖于标准库中的 <thread>
头文件提供的功能。C++11 引入了新的线程库,使得创建和管理线程变得更加简单和直观。
创建线程
在 C++ 中创建线程主要有以下几种方式:
1. 使用 std::thread
类
这是最常见的方式,使用 std::thread
类可以轻松地创建线程。
#include <iostream>
#include <thread>
void threadFunction() {
std::cout << "Thread is running..." << std::endl;
}
int main() {
std::thread t(threadFunction);
std::cout << "Main thread is running..." << std::endl;
// 等待线程完成
t.join();
std::cout << "Thread finished." << std::endl;
return 0;
}
在这个例子中,threadFunction
将在一个新线程中运行,而 main
函数将继续执行。t.join()
调用会阻塞 main
函数,直到 threadFunction
完成。
2. 使用 lambda 表达式
你可以直接在 std::thread
构造函数中使用 lambda 表达式来启动线程。
#include <iostream>
#include <thread>
int main() {
std::thread t([]() {
std::cout << "Thread is running..." << std::endl;
});
std::cout << "Main thread is running..." << std::endl;
t.join();
std::cout << "Thread finished." << std::endl;
return 0;
}
3. 使用线程函数和参数
你也可以传递参数给线程函数,这需要使用可调用对象(如函数指针、lambda 或函数对象)。
#include <iostream>
#include <thread>
void threadFunction(int value) {
std::cout << "Thread is running with value: " << value << std::endl;
}
int main() {
int value = 42;
std::thread t(threadFunction, value);
std::cout << "Main thread is running..." << std::endl;
t.join();
std::cout << "Thread finished." << std::endl;
return 0;
}
线程同步
C++ 提供了几种机制来同步线程,包括互斥锁(mutexes)、条件变量(condition variables)、原子操作(atomics)等。
互斥锁
互斥锁用于防止多个线程同时访问共享资源。
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void print_block(int n, char c) {
mtx.lock();
// 当多个线程访问这个代码块时,它们将按顺序执行
std::cout << n << " " << c << std::endl;
mtx.unlock();
}
int main() {
std::thread t1(print_block, 1, 'A');
std::thread t2(print_block, 2, 'B');
t1.join();
t2.join();
return 0;
}
条件变量
条件变量用于线程之间的同步,特别是在生产者-消费者问题中。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void do_work(std::condition_variable& cv, std::unique_lock<std::mutex>& lck) {
lck.lock();
while (!ready) {
cv.wait(lck);
}
std::cout << "Work is ready!" << std::endl;
lck.unlock();
}
int main() {
std::thread t(do_work, std::ref(cv), std::ref(mtx));
// 在这里准备工作
std::this_thread::sleep_for(std::chrono::seconds(1));
ready = true;
cv.notify_one();
t.join();
return 0;
}
总结
C++11 引入的线程库简化了多线程编程,使得创建和管理线程变得更加容易。使用用 std::thread
类可以轻松地创建线程,并通过互斥锁、条件变量等机制来同步线程。了解和正确使用这些工具对于编写高效的并发程序至关重要。
没有-lpthread编译不过去
#include<iostream>
#include<thread>
#include<unistd.h>
using namespace std;
void threadrun()
{
while(true)
{
cout<< "I am a new thread for C++" <<endl;
sleep(2);
}
}
int main()
{
thread t1(threadrun);
//等待new thread结束
t1.join();
return 0;
}
线程ID以及进程地址空间布局
自己的线程id
pthread_self
是 POSIX 线程库(pthread)中一个用于获取当前执行线程的线程标识符的函数。在 C 语言中,线程标识符是一个 pthread_t
类型的值,该类型是线程库中定义的一个无符号整数。
pthread_self
(自己的线程id)
以下是如何使用 pthread_self
函数的示例:
包含头文件
#include <pthread.h>
使用 pthread_self
pthread_t my_thread_id = pthread_self();
或者,你可以在表达式中直接使用它:
pthread_t current_thread_id = pthread_self();
printf("Current thread ID: %lu\n", (unsigned long)current_thread_id);
这里,(unsigned long)current_thread_id
是为了将 pthread_t
类型的值转换为 unsigned long
类型,这样就可以用 %lu
格式说明符来输出它,因为 pthread_t
可能是一个较大的整数类型。
返回值
pthread_self
函数返回当前执行线程的标识符,如果成功,则返回 pthread_t
类型的值,如果失败,则返回 PTHREAD_ERRORCHECK
或者 PTHREAD_NULL
。
- 如果线程库配置为
PTHREAD_ERRORCHECK
模式,并且发生错误,则返回PTHREAD_ERRORCHECK
。 - 如果线程库配置为
PTHREAD_CANCEL_DISABLE
模式,并且当前线程被取消,则返回PTHREAD_CANCELLED
。 - 在其他情况下,如果
pthread_self
失败,则返回PTHREAD_NULL
。
示例
下面是一个简单的示例,展示如何使用 pthread_self
:
#include <stdio.h>
#include <pthread.h>
void* thread_function(void* arg) {
pthread_t thread_id = pthread_self();
printf("Thread ID: %lu\n", (unsigned long)thread_id);
return NULL;
}
int main() {
pthread_t thread_id;
// 创建线程
if (pthread_create(&thread_id, NULL, thread_function, NULL) != 0) {
perror("pthread_create");
return 1;
}
// 等待线程结束
pthread_join(thread_id, NULL);
return 0;
}
在这个示例中,主线程创建了一个新线程,并在新线程的线程函数中打印出了新线程的线程标识符。然后,主线程等待新线程完成其执行。
把id转成16进程然后打印出来
全局变量可以被多个线程同时使用的,函数也可以被多个线程调用
创建一批线程并且把tid放进vector里
所有线程都进行等待了,不知道谁先进行的,但可以保证一定是主线程是最后退出的
为了td传入线程里的时候不是同一个内容
#include <iostream>
#include <vector>
#include <pthread.h>
#include <string>
#include <unistd.h>
using namespace std;
#define NUM 10
class threadDate
{
};
void *threadRoutine(void *args)
{
threadDate* td = static_cast<threadDate*>(args);
int i = 0;
while(i < 10)
{
cout<< "pid: " <<getpid()<<",";
}
return nullptr;
}
int main()
{
vector<pthread_t> tids;
for (int i = 0; i < NUM; i++)
{
pthread_t tid;
threadDate* td = new threadDate;
pthread_create(&tid, nullptr, threadRoutine, td);
//插入tid进入vector
tids.push_back(tid);
}
for(int i = 0; i < NUM; i++)
{
pthread_join(tids[i],nullptr);
}
return 0;
}
进行初始化
补充
测试
所有的线程都是执行threadRoutine,每个线程都是在不同的栈结构,都有独立的栈结构
每一个线程都有独立的栈,主线程在栈区,新线程在共享区,但都在地址空间里
每一个线程都会有自己独立的栈结构!
其实线程和线程之间,几乎没有密码
线程的栈上的数据,也是可以被其他线程看到并访问的
全局变量是被所有的线程同时看到并访问的
要一个私有的全局变量呢(__thread)
__thread(线程级别的全局变量,互不干扰)(只能定义内置类型,不能用来修饰自定义类型)
分离线程pthread_detach
①thread之可连接和分离的线程 | Joinable and Detached Threads - Smah - 博客园 (cnblogs.com)
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏
- 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源
pthread_detach
是 POSIX 线程库(pthread)中的一个函数,用于将一个已经创建的非守护(joinable)线程标记为可被 detaching。这意味着当该线程完成执行时,它会自动释放其资源,而不需要另一个线程来调用 pthread_join
来等待它的结束。
用法
以下是如何使用 pthread_detach
的步骤:
1. 包含头文件
#include <pthread.h>
2. 创建线程
首先,使用 pthread_create
创建一个线程。
pthread_t thread_id;
if (pthread_create(&thread_id, NULL, thread_function, NULL) != 0) {
perror("pthread_create");
return 1;
}
3. 标记线程为可分离
使用 pthread_detach
将创建的线程标记为可分离。
if (pthread_detach(thread_id) != 0) {
perror("pthread_detach");
return 1;
}
4. 继续执行
在调用 pthread_detach
后,当前线程可以继续执行,而不会因为 pthread_detach
而被阻塞。
注意事项
- 在调用
pthread_detach
之前,不应该调用pthread_join
,因为这会导致行为未定义。 - 如果尝试 detaching 已经被
pthread_join
等待的线程,则会导致未定义的行为。 - 如果守护线程(daemon thread)尝试调用
pthread_detach
,则会导致未定义的行为。 - 当线程被分离后,它将无法再次被分离,并且如果该线程在分离后还继续运行,它将不会释放其资源。
示例
下面是一个简单的示例,演示了如何使用 pthread_detach
:
#include <stdio.h>
#include <pthread.h>
void* thread_function(void* arg) {
// 执行线程任务
printf("Thread is running...\n");
// 线程任务完成后自然结束
return NULL;
}
int main() {
pthread_t thread_id;
// 创建线程
if (pthread_create(&thread_id, NULL, thread_function, NULL) != 0) {
perror("pthread_create");
return 1;
}
// 将线程标记为可分离
if (pthread_detach(thread_id) != 0) {
perror("pthread_detach");
return 1;
}
// 主线程继续执行,而不会等待子线程结束
printf("Main thread is still running...\n");
// 假设主线程运行一段时间后结束
sleep(1);
return 0;
}
在这个示例中,主线程创建了一个子线程,然后立即将其标记为可分离。这意味着子线程将在其任务完成后自动释放资源,而主线程可以继续执行或立即退出。
上面的是主线程给分离,这是自己分离自己,有一样的效果
如果分离了,就说明主线程就不想等待新线程了
线程互斥
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量
- 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
- 多个线程并发的操作共享变量,会带来一些问题
用多线程模拟抢票
定义10个线程,1000张票
定义一个线程数据类
之所以Date写成一个类就是为了以后方便往线程里加东西
tid放进vecotr
所有线程的数据,把所有的参数数据放进vector,并传给线程执行函数getTicket。
等待线程和释放new的空间
写抢票
强转类型要记得看main线程中传的类型
测试代码
开始抢票
先拿到线程的名字
然后确定可以抢票的范围,必须要大于0才能开始进行抢票
设置4个线程并发的访问的这里的抢票逻辑,也会共同的访问共同的全局变量票数
#include<iostream>
#include<pthread.h>
#include<string>
#include<vector>
#include<unistd.h>
using namespace std;
#define NUM 4 //线程个数
int tickets = 1000;
//线程数据
class threadDate
{
public:
threadDate(int number)
{
threadname = "thread-" + to_string(number);
}
public:
string threadname;
};
void *getTicket(void *args)
{
threadDate* td = static_cast<threadDate*>(args);
//先拿到线程的名字
const char*name = td->threadname.c_str();
while(true)
{
if(tickets > 0)
{
usleep(1000);
printf("who:%s, get a ticket: %d\n", name, tickets);
tickets--;
}
else break;
}
printf("%s ... quit\n", name);
return nullptr;
}
int main()
{
vector<pthread_t> tids;
vector<threadDate*> thread_datas;
for(int i = 1; i <= NUM; i++)
{
pthread_t tid;
threadDate* td = new threadDate(i);
thread_datas.push_back(td);
pthread_create(&tid, nullptr, getTicket, thread_datas[i-1]);
tids.push_back(tid);
}
for(auto e : tids)
{
pthread_join(e, nullptr);
}
for(auto e : thread_datas)
{
delete e;
}
return 0;
}
代码测试:
出问题了,抢到负数了
为何会出现这种情况
因为共享数据-----》数据不一致问题,肯定和多线程并发访问是有关系的
对一个全局变量进行多线程并发---/++操作是不安全的
计算就是将内存中的数据读到cpu中计算(拷贝的过程)
当票数为1时,可能有多个线程同时进行tickets判断
在tickets > 0,读完后,下面的tickets也需要重新读取数据,可能被别的线程减成0了,但有的线程已经进来了,而且还需要重新读数据,所以就会--到负数
怎么解决
进行加锁
定义成全局的这种,可以不用destroy
所以要保证每一个线程它在并发访问某种资源的时候,它首先得保证它们访问同一把锁
定义一个锁变量,在mian函数栈上定义的
给锁初始化
// 定义锁
pthread_mutex_t lock;
//初始化锁
pthread_mutex_init(&lock, nullptr);
不想要了就释放
在Date类中定义一下锁
定义类时,就要传递一个锁了,因为肯定要传地址,所以要用*,&也是可以的
把锁传进去
加锁
参数就是锁的地址
- 全局变量叫做临界资源
- 把访问临界资源的一小部分叫做临界区
- 加锁的本质使用时间换安全
加锁和解锁的位置,临界代码越少越好,还要确保两个条件下都要能解锁
申请锁成功,才能往后执行,不成功就是阻塞在这里等待
可能有的线程竞争力比较强,所以有可能有的票被一个线程抢了
测试:
没有这个usleep,就很可能有饥饿问题
测试:无usleep
如何解决
1.外面来的,必须排队
2.出来的人,不能立马重新申请锁,必须排到队列的尾部让所有的线程(人)获取锁(钥匙)按照一定的顺序。按照一定的顺序性获取资源---同步!
锁本身就是共享资源!!
所以,申请锁和释放锁本身就被设计成为了原子性操作(如何做到的?)
原子性
信号量本身就是原子性的 也就是说你多线程共享这个资源的时候 原子性确保了在某个线程执行一个操作的过程中,其他线程无法干扰这个操作
全局锁
不用初始化和释放
修改一下
互斥锁
解锁可以用其他线程解锁,概率很小
单独设置一个线程不设置锁(是不可以的,BUG)
锁的应用,封装
要注意的是锁还是要从外部传入的
代码如下
#pragma once
#include <pthread.h>
#include <iostream>
using namespace std;
class Mutex
{
public:
Mutex(pthread_mutex_t *lock):lock_(lock)
{}
void Lock()
{
pthread_mutex_lock(lock_);
}
void Unlock()
{
pthread_mutex_unlock(lock_);
}
~Mutex()
{}
private:
pthread_mutex_t *lock_;
};
锁的警卫(guard)
走到这对象已经存在了,Mutex里的属性和成员就已经可以用了
main.c中改进
这部分重新写
void *getTicket(void *args)
{
threadDate *td = static_cast<threadDate *>(args);
// 先拿到线程的名字
const char *name = td->threadname_.c_str();
while (true)
{
//pthread_mutex_lock(td->lock_);//申请锁成功,才能往后执行,不成功就是阻塞在这里等待
pthread_mutex_lock(&lock);//申请锁成功,才能往后执行,不成功就是阻塞在这里等待
if (tickets > 0)
{
usleep(1000);
printf("who:%s, get a ticket: %d\n", name, tickets);
tickets--;
pthread_mutex_unlock(&lock);//解锁
}
else
{
pthread_mutex_unlock(&lock);
break;
}
//usleep(13);//我们抢到了票,我们会立马去抢下一张票吗?其实多线程还要执行得到票之后的后续动作
}
printf("%s ... quit\n", name);
return nullptr;
}
全局的lock,会自动释放自动解锁
测试:
代码
main.cc
#include <iostream>
#include <pthread.h>
#include <string>
#include <vector>
#include <unistd.h>
#include "mutex.hpp"
using namespace std;
#define NUM 4 // 线程个数
int tickets = 1000;
//全局锁
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 线程数据
class threadDate
{
public:
threadDate(int number /*, pthread_mutex_t *mutex*/)
{
threadname_ = "thread-" + to_string(number);
//lock_ = mutex;
}
public:
string threadname_;
//pthread_mutex_t *lock_;
};
void *getTicket(void *args)
{
threadDate *td = static_cast<threadDate *>(args);
// 先拿到线程的名字
const char *name = td->threadname_.c_str();
while (true)
{
LockGuard lockguard(&lock);//临时的LockGuard对象,RAII风格的锁
//pthread_mutex_lock(td->lock_);//申请锁成功,才能往后执行,不成功就是阻塞在这里等待
//pthread_mutex_lock(&lock);//申请锁成功,才能往后执行,不成功就是阻塞在这里等待
if (tickets > 0)
{
usleep(1000);
printf("who:%s, get a ticket: %d\n", name, tickets);
tickets--;
//pthread_mutex_unlock(&lock);//解锁
}
else
{
//pthread_mutex_unlock(&lock);
break;
}
//usleep(13);//我们抢到了票,我们会立马去抢下一张票吗?其实多线程还要执行得到票之后的后续动作
}
printf("%s ... quit\n", name);
return nullptr;
}
int main()
{
// 定义锁
pthread_mutex_t lock;
//初始化锁
pthread_mutex_init(&lock, nullptr);
vector<pthread_t> tids;
vector<threadDate *> thread_datas;
for (int i = 1; i <= NUM; i++)
{
pthread_t tid;
//threadDate *td = new threadDate(i, &lock);
threadDate *td = new threadDate(i);
thread_datas.push_back(td);
pthread_create(&tid, nullptr, getTicket, thread_datas[i - 1]);
tids.push_back(tid);
}
for (auto e : tids)
{
pthread_join(e, nullptr);
}
for (auto e : thread_datas)
{
delete e;
}
pthread_mutex_destroy(&lock);
return 0;
}
mutex.hpp
#pragma once
#include <pthread.h>
#include <iostream>
using namespace std;
class Mutex
{
public:
Mutex(pthread_mutex_t *lock):lock_(lock)
{}
void Lock()
{
pthread_mutex_lock(lock_);
}
void Unlock()
{
pthread_mutex_unlock(lock_);
}
~Mutex()
{}
private:
pthread_mutex_t *lock_;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock):mutex_(lock)
{
//走到这对象已经存在了,Mutex里的属性和成员就已经可以用了
mutex_.Lock();
}
~LockGuard()
{
mutex_.Unlock();
}
private:
Mutex mutex_;
};
线程安全
可重入VS线程安全
一个函数被多个线程同时调用就叫做重入
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
可重入的就是线程安全的
常见不可重入的情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
不可重入的,可能会有问题
死锁
一个也可能死锁
死锁的四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
如何解决死锁问题
理念 破坏4个必要条件 只需要一个不满足就可以
第一个(pthread_mutex_lock):失败了会阻塞等待
第二个(pthread_mutex_trylock)这个方法申请锁,申请失败会立即返回
同步问题
条件变量
提供简单的通知机制
同样可以定义成全局
唤醒所有线程和单个线程(...cond)
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
这里的i不用取地址,这样可以保证是独立的
自己分离自己,就不需要等待释放了
测试
这样的情况(这样就变成全5了,&i的话,新线程和主线程访问的就是一个i了,如果mian中先跑了,那么i要是到力扣5,新线程中的number也就变成了5,如果想共享就&)
因为条件变量是和锁不分的,所以要创建一个锁
一个锁和一个条件变量(全局的)
加上锁了
为什么把等待放在加锁里面,因为不想让你一进来就一股脑的进行cout打印,要有顺序的进行打印
先加的锁再等待,在等待的时候,其他线程根本就没有机会去持有锁,等待的时候,这把锁是被线程带走的
目前情况,线程拿着锁一直在等待,怎么唤醒呢
唤醒一个线程
测试:
唤醒了一批线程
测试
怎么知道要让一个线程去休眠
代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int cnt = 0;
void *Count(void *args)
{
//自己分离自己,就不需要等待释放了
pthread_detach(pthread_self());
uint64_t number = (uint64_t)args;
while(true)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
//不管临界资源的状态情况
cout<< "pthread: " << number << " cnt: " << cnt++ << endl;
pthread_mutex_unlock(&mutex);
sleep(3);
}
}
int main()
{
for(uint64_t i = 0; i < 5; i++)
{
pthread_t tid;
pthread_create(&tid, nullptr, Count, (void*)i);
}
while(true)
{
sleep(1);
//唤醒一个
//pthread_cond_signal(&cond);//唤醒再cond的等待队列中等待的一个线程,默认都是第一个
//唤醒一批
pthread_cond_broadcast(&cond);
cout<< "signal one thread..." <<endl;
}
//让主线程不退出
while(true) sleep(1);
return 0;
}
生产消费模型
理论
实现一个生产消费模型,类似于管道
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程 操作时会被阻塞)
用C++中的queue充当队列
编译main
先创建生产者和消费者线程
#include "BlockQueue.hpp"
#include<iostream>
using namespace std;
//生产者
void* Productor(void* args)
{
}
//消费者
void* Consumer(void* args)
{
}
int main()
{
pthread_t c, p;
pthread_create(&c, nullptr, Productor, nullptr);
pthread_create(&p, nullptr, Consumer, nullptr);
//等待线程
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
两个线程和一个阻塞队列
怎么样看到同一份资源呢,这样把对象穿过去
消费是要从队列中拿数据,生产是往队列里放数据
设计阻塞队列
不知道生产什么在队列里,直接用个模版
#pragma once
#include<queue>
#include<iostream>
using namespace std;
template<class T>
class BlockQueue
{
public:
private:
queue<T> q;
};
定义一个极值,创建的最大空间
也需要保护它,定义一把锁,来保护临界资源 ,还要维护同步关系,生产满了就让消费者来消费,消费完了就让生产者生产
还要维护同步关系,用....cond_t
必要的接口
构造函数实现一下,缺省一下
先模拟一下最简单的生产
锁和条件变量初始化了
释放
push的时候怎么确保没有在pop呢
加锁保护起来
要先确保生产条件满足
如果这里这样写的话,就是拿着锁进行等待的,就没有人能消费了,在等的时候必须释放锁
但...._cond会自动释放锁,让自己处于阻塞状态,这也就是参数(wait里的第二个参数)会带锁的原因
生产的代码这就是
为什么判断条件不放在锁的外面,因为:
这个cond_变量,pop中不能再用这个了,生产和消费不能用同一个,我们在进行唤醒的时候,唤醒的是哪一个呢,唤醒就不确定了
增加一个变量
谁来唤醒呢
在生产的这里唤醒,生产了以后就唤醒消费者来消费
同理消费者也是
暂时这样测试,有一定的同步关系的,让消费慢一点
改正一些小错误
返回T&,接收时也要T&
生产的很慢时测试结果就是生产几个消费者就拿几个
改进一下
定义一个低水位和高水位,生产者不到低水位,消费者不消费,到了高水位,生产者就不生产了
如果大于高水位就唤醒消费者
生产了超过high_water,就开始唤醒消费者进行消费,消费到一定数量,再唤醒生产者生产,自己进行等待,这样就形成了生产一批消费一批
生产者对应的数据从哪里来
模拟生产者生产数据,要先获取到数据才能生产数据
获取到的数据,也需要时间来进行获取和生产
消费者这里
在阻塞队列里放对象或者任务
构造一个task,oper_是要进行什么操作
exitcode_出问题,退出码
一个除0错误一个模0错误
其他符号时
获得结果
更改类型
生产了一个任务
这样用户就可以看到生产的任务了
消费者这边,调用了默认拷贝构造,这里消费者拿到了数据,还要对数据进行加工处理
打印出来
这样就形成了一个线程,制造对应的任务,然后把它生产到阻塞队列里,另一个线程,消费队列里,消费对应的任务,并做计算
测试让生产慢一点
看一个异常的情况: 随机值要产生0
仿函数
被误唤醒了呢
当阻塞队列block list已经满了的时候,消费者pop一个后,...._cond_br....全给生产者唤醒了(误唤醒了), 就可能会多生产
push这里等待的时候要释放锁,唤醒的时候要加锁的
做到防止伪唤醒的情况
叫个while,就算是多唤醒的从wait这里返回后,有别的线程争抢到锁,但maxcap==size()的情况继续判断,就可以预防这种情况
误唤醒生产者就可能会出现多生产者,单消费者的情况
多消费者多生产者(用的也是一个阻塞队列)
总代码
BlockQueue.hpp
#pragma once
#include<queue>
#include<iostream>
#include<unistd.h>
using namespace std;
template<class T>
class BlockQueue
{
static const int defalutnum = 5;
public:
BlockQueue(int maxcap = defalutnum) :maxcap_(maxcap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&c_cond_, nullptr);
pthread_cond_init(&p_cond_, nullptr);
low_water_ = maxcap_/3;
high_water_ = (maxcap_*2)/3;
}
//谁来唤醒呢?
T& pop()
{
pthread_mutex_lock(&mutex_);
// if(q.size() == 0)//因为判断临界资源调试是否满足,也是在访问临界资源!判断资源是否就绪,是通过在临界资源内部判断的。
// {
// //如果线程wait时,被误唤醒了呢
// pthread_cond_wait(&c_cond_, &mutex_);//你是持有锁的!!!1.调用的时候,自动释放,因为唤醒而返回的时候,重新持有锁
// }
while(q.size() == 0)
{
pthread_cond_wait(&c_cond, &_mutex);
}
const T& out = q.front();//你想消费,就直接能消费吗?不一定,你得先确保消费条件满足
q.pop();
// if(q.size() < low_water_)
// {
// pthread_cond_signal(&p_cond_);
// }
pthread_cond_signal(&p_cond_);
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T& in)
{
pthread_mutex_lock(&mutex_);
while(q.size() == maxcap_)
{
pthread_cond_wait(&p_cond_, &mutex_);//1.调用的时候,自动释放锁
}
//1.队列没满 2.被唤醒
q.push(in);
if(q.size() > high_water_)
{
pthread_cond_signal(&c_cond_);
}
pthread_mutex_unlock(&mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
queue<T> q; //共享资源
int maxcap_; // 极值
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;
pthread_cond_t p_cond_;
int low_water_;
int high_water_;
};
Task.hpp
#pragma once
#include<iostream>
#include<string>
using namespace std;
string opers = "+-*/%";
enum
{
Div_zero = 1,
Mod_zero,
Unknown
};
class Task
{
public:
Task(int x, int y, char op):data1_(x), data2_(y), oper_(op),
result_(0), exitcode_(0)
{}
void run()
{
switch(oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = Div_zero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = Mod_zero;
else result_ = data1_ / data2_;
}
break;
default:
exitcode_ = Unknown;
break;
}
}
void operator()()
{
run();
}
string GetResult()
{
string r = to_string(data1_);
r += oper_;
r += to_string(data2_);
r += '=';
r += to_string(result_);
r += "[code: ";
r += to_string(exitcode_);
r += "]";
return r;
}
string GetTask()
{
string r = to_string(data1_);
r += oper_;
r += to_string(data2_);
r += "=?";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
// 生产者
void *Productor(void *args)
{
int len = opers.size();
//int data = 0;
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
//模式生产者生产数据
int data1 = rand()%10 + 1;//[1,10]
usleep(10);
int data2 = rand() % 10 + 1;
char op = opers[rand() % len];
Task t(data1, data2, op);
//生产
bq->push(t);
cout<< "生产了一个数据:" << t.GetTask() << endl;
sleep(1);
}
}
// 消费者
void *Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
//消费
Task t = bq->pop();
//sleep(2);
//拿任务做计算
t();
cout<< "处理任务" << t.GetTask() << "运行结果是:" << t.GetResult() << endl;
}
}
int main()
{
srand(time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c, p;
pthread_create(&c, nullptr, Productor, bq);
pthread_create(&p, nullptr, Consumer, bq);
// 等待线程
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
多消费,多生产
多消费,多生产
释放
打印出来看看,测试一下
让生产慢一点
为什么添加几个for循环就可以变成多线程的了呢(多生产多消费)
多生产多消费模型,有的进行1,有的进行2,并发度就上来了
信号量(本质计数器)
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于 线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
p v操作
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序 (POSIX信号量):
基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来 判断满或者空。另外也可以预留一个空的位置,作为满的状态
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
解决办法
空一个位置
要遵守的三个原则
环形队列
先写单生产单消费
这样就可以都看到同一份资源rq
#include<iostream>
#include "RingQueue.hpp"
#include<pthread.h>
#include<unistd.h>
using namespace std;
//生产者
void* Productor(void* args)
{
RingQueue* rq = static_cast<RingQueue*>(args);
while(true)
{
cout<< "Productor" <<endl;
sleep(1);
}
return nullptr;
}
//消费者
void* Consumer(void* args)
{
RingQueue* rq = static_cast<RingQueue*>(args);
while(true)
{
cout<< "Consumer" <<endl;
sleep(1);
}
return nullptr;
}
int main()
{
RingQueue *rq = new RingQueue();
pthread_t c, p;
pthread_create(&p, nullptr, Productor, rq);
pthread_create(&c, nullptr, Consumer, rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
环形队列就是这个
把下标弄上
信号量
构造一下,空间预留5个
初始化信号量
第一个参数:信号量
2:0
3:初始值
第一个数据信号量为0,因为一开始是没有数据的,而空间信号量是cap(满的)
对信号量进行销毁
入数据
在生产中把信号量进行p v操作,P V可以理解为,P减V加
在环形队列中生产数据
消费的行为,P V可以理解为,P减V加
消费和生产都有可能先被调度,但一定是生产先运行
因为在初始化信号量的时候,只有生产者能P成功
main.cc中
重要步骤
生产者里
消费者
在生产者这加上3秒,那么消费者先消费数据,pop
然后进行P操作
生产者3秒之后才跑
虽然消费者代码先执行,但消费者必须等生产者3秒之后生产了才能进行消费
多生产多消费 定义两把锁
初始化
销毁
把加锁和解锁封装一下
竞争信号量是不冲突的,信号量申请是原子的
信号量本身就是原子性的 也就是说你多线程共享这个资源的时候 原子性确保了在某个线程执行一个操作的过程中,其他线程无法干扰这个操作
生产着加锁,保护的是下标和对下标的访问
把这些代码放P V之间,是为了多生产多消费出现问题,消费者和生产者会冲突的部分也就这6行
生产者加锁
这样的话来多少生产者都没事了,因为只有一个生产者能拿到锁,只有一个消费者能竞争成功
然后P,进行申请信号量,没有信号量就等着,持有锁等待着
生产者中,先加锁还是先申请信号量呢
2比较好,为什么呢
2:把申请信号量放在外部,可以在一定程度上在多线程申请时,让申请信号量和申请锁的时间变成并行的,可以提高效率,先去申请信号量不会出现错误,因为信号量是原子的,抢到了证明你有这个资格,有资格才把锁给你,那么在我访问期间(P和Lock之后的),陆陆续续还有其他线程过来,过来没事,先去申请信号量,只要你申请信号量,资源给你预定了,下次直接加锁。
1就不行:加锁才能申请信号量,申请信号量的永远只有一个,不能很好的提高并发度
消费者:同理
这样就支持多生产多消费了
信号量相当于预定,先预定在排队效率比较高
实际上进入环形队列的也只有一个消费者或一个生产者,为空时,只有生产者一个,消费者进不去,为满时,生产者进不去,都不是那么生产和消费可以都在
测试用例
添加任务,生产任务
都改成Task
生产者这的修改
构造重载一下,无参构造
不加这个就会错
总代码
RingQueue.hpp
#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>
#include<semaphore.h>
using namespace std;
const static int defaultcap = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex &mutex)
{
pthread_mutex_ulock(&mutex);
}
public:
RingQueue(int cap = defaultcap):ringqueue_(cap), c_step(0), p_step(0),cap_(cap)
{
sem_init(&cdata_sem_, 0, 0);
sem_init(&pspace_sem_, 0, cap);
pthread_mutex_t_init(&c_mutex, nullptr);
pthread_mutex_t_init(&p_mutex, nullptr);
}
void Push(const T&in)//生产
{
// //1
// Lock(p_mutex);
// P(pspace_sem_);
//2
P(pspace_sem_);
Lock(p_mutex);
ringqueue_[p_step] = in;
V(cdata_sem_);
//处理下标
p_step++;
p_step %= cap_;
Unlock(p_mutex);
}
void Pop(T* out) //消费
{
P(cdata_sem_);
Lock(c_mutex);
*out = ringqueue_[c_step];
V(pspace_sem_);
//下标后移,维持环形特性
c_step++;
c_step %= cap_;
Unlock(c_mutex);
}
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_t_destroy(&c_mutex,);
pthread_mutex_t_destroy(&p_mutex,);
}
private:
vector<T> ringqueue_;
int cap_; //确保下表不大于总数
int c_step; //消费者下标
int p_step; //生产者下标
sem_t cdata_sem_; //消费者关注的数据资源
sem_t pspace_sem_;//生产者关注的数据资源
pthread_mutex_t c_mutex;
pthread_mutex_t p_mutex;
};
main.cc
#include<iostream>
#include "RingQueue.hpp"
#include<pthread.h>
#include<unistd.h>
using namespace std;
//生产者
void* Productor(void* args)
{
sleep(3);
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
while(true)
{
//1.获取数据
int data = rand()%10 + 1;
//2.生产数据
rq->Push(data);
cout<< "Productor data done, data is :" << data << endl;
sleep(1);
}
return nullptr;
}
//消费者
void* Consumer(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
while(true)
{
//1.消费数据
int data = 0;
rq->Pop(&data);
//2.处理数据
}
return nullptr;
}
int main()
{
RingQueue<int> *rq = new RingQueue<int>();
pthread_t c, p;
pthread_create(&p, nullptr, Productor, rq);
pthread_create(&c, nullptr, Consumer, rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
线程池 池化技术
/*threadpool.h*/
/* 线程池:
* 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
* 线程池的应用场景:
* 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
* 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
* 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
* 线程池的种类:
* 线程池示例:
* 1. 创建固定数量线程池,循环从任务队列中获取任务对象,
* 2. 获取到任务对象后,执行任务对象中的任务接口
*/
/*threadpool.h*/
/* 线程池:
* 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着
监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利
用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
* 线程池的应用场景:
* 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技
术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个
Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
* 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
* 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情
况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,
出现错误.
* 线程池的种类:
* 线程池示例:
* 1. 创建固定数量线程池,循环从任务队列中获取任务对象,
* 2. 获取到任务对象后,执行任务对象中的任务接口
*/
创建目录文件
先把接口设置好
把线程放进vector中维护管理起来
创建线程
再把任务拷贝过来
调用
线程一旦启动后
这样make以后,会有这些问题
线程在类内写的话会有this指针
这样的话参数个数就会有问题
加个static就好了
开始处理任务
上关锁
叫一下线程
指定条件变量休眠,检测到队列中没有任务(也就是检测临界资源)
空的时候休眠,把第一个任务拿出来
t()是处理任务
但编译不过
静态的成员不能访问类内的成员变量或者成员函数(可以访问静态的属性或者函数可以)
怎么解决呢
加一个this
完善一下
构建任务
这就是生产者消费者模型(进程池也类似:管道不就是任务队列吗:管道自带同步和互斥)
代码
ThreadPool.hpp
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <pthread.h>
#include <vector>
#include <queue>
using namespace std;
struct ThreadInfo
{
pthread_t tid;
string name;
};
static const int defalutnum = 5; // 线程池有几个线程
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Weakup()
{
pthread_cond_signal(&cond_);
}
// 指定条件变量休眠,检测到队列中没有任务(也就是检测临界资源)
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
void Push(const T &t)
{
Lock();
tasks_.push(t);
Weakup();
Unlock();
}
void Pop(T *out)
{
Lock();
tasks_.pop();
Unlock();
}
string GerThreadName(pthread_t tid)
{
for(const auto &t : threads_)
{
if(t.tid == tid)
return t.name;
}
return "None";
}
static void *HandlerTask(void *args)
{
ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
string name = tp->GerThreadName(pthread_self());
while (true)
{
tp->Lock();
while(tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
}
}
T Pop()
{
T t = tasks_.front();
tasks_.pop();
return t;
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "thread-" + to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
bool IsQueueEmpty()
{
return tasks_.empty();
}
private:
vector<ThreadInfo> threads_;
queue<T> tasks_;
pthread_mutex_t mutex_; // 锁
pthread_cond_t cond_; // 条件变量
};
main.cc
#include<iostream>
#include"ThreadPool.hpp"
#include"Task.hpp"
using namespace std;
int main()
{
ThreadPool<Task> *tp = new ThreadPool<Task>(5);
tp->Start();
while(true)
{
//1.构建任务
int x = rand() % 10 +1;
usleep(10);
int y = rand() % 5;
char op = opers[rand()%opers.size()];
Task t(x, y, op);//x + y = op
tp->Push(t);
//2.交给线程池处理
cout<< "main thread make task " << t.GetTask() <<endl;
sleep(1);
}
return 0;
}
线程封装(我们写的基本都是原生的)做一下封装 C++中
isrunning线程是否是运行的
start_timestamp时间戳
接口封装一下
传入了一个函数指针 cb是方法
构造一下
run
join(等待)
入口方法
线程的封装就已经解决了
开始调用
线程也是一个对象了
多个线程了
把线程放进vector了
线程安全的单例模式
什么是单例模式
单例模式是一种 "经典的, 常用的, 常考的" 设计模式.
什么是设计模式
IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大 佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式
单例模式的特点
某些类, 只应该具有一个对象(实例), 就称之为单例.
例如一个男人只能有一个媳妇.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这 些数据.
饿汉实现方式和懒汉实现方式
[洗完的例子]
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度.
饿汉方式实现单例模式
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例.
懒汉方式实现单例模式
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
但是后续再次调用, 就没有问题了.
懒汉模式可以提高加载速度,需要时才使用
把线程池改成懒汉模式
需要有一个指针来指向我们当前线程池的单例
内部的第一个成员
想让这个对象只有一份,就需要加一个static
因为他是个静态成员,就需要在类外进行初始化
不希望能被拷贝和赋值,所以我们一般把线程池内部的赋值方法,赋值语句拷贝构造全部设置成私有的
把拷贝和赋值禁用一下
获取单例的一个接口
单例只有一个
问题:
就是多个线程一起用这个单例
多个线程如果都跑过来竞争是的来申请我们对应的GetInstance,都来申请了
可能被new成了多分
可用加锁解决
在静态里用锁,锁也要定义成静态的
直接用全局的
加锁
加一个这个的意义
abcd四个线程进来了,一块判断为不为空,然后每个线程都迫不及待的去竞争锁了,但只有一个线程能进去锁下面的代码,然后以后进来的别的线程,判断的时候就会直接走到return,这样就并发获取单例了
代码
ThreadPool.hpp
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <pthread.h>
#include <vector>
#include <queue>
using namespace std;
struct ThreadInfo
{
pthread_t tid;
string name;
};
static const int defalutnum = 5; // 线程池有几个线程
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Weakup()
{
pthread_cond_signal(&cond_);
}
// 指定条件变量休眠,检测到队列中没有任务(也就是检测临界资源)
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
void Push(const T &t)
{
Lock();
tasks_.push(t);
Weakup();
Unlock();
}
void Pop(T *out)
{
Lock();
tasks_.pop();
Unlock();
}
string GerThreadName(pthread_t tid)
{
for (const auto &t : threads_)
{
if (t.tid == tid)
return t.name;
}
return "None";
}
static void *HandlerTask(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
string name = tp->GerThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
}
}
T Pop()
{
T t = tasks_.front();
tasks_.pop();
return t;
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "thread-" + to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
static ThreadPool<T> *GetInstance()
{
if (nullptr == tp_)
{
pthread_mutex_lock(&lock_);
if (nullptr == tp_)
{
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
private:
bool IsQueueEmpty()
{
return tasks_.empty();
}
private:
// 构造单例
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
private:
vector<ThreadInfo> threads_;
queue<T> tasks_;
pthread_mutex_t mutex_; // 锁
pthread_cond_t cond_; // 条件变量
static ThreadPool<T> *tp_;
static pthread_mutex_t lock_;
};
template <class T>
ThreadPool<T>*ThreadPool<T>::tp_ = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
#pragma once
#include<iostream>
#include<string>
using namespace std;
string opers = "+-*/%";
enum
{
Div_zero = 1,
Mod_zero,
Unknown
};
class Task
{
public:
Task(int x, int y, char op):data1_(x), data2_(y), oper_(op),
result_(0), exitcode_(0)
{}
void run()
{
switch(oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = Div_zero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = Mod_zero;
else result_ = data1_ / data2_;
}
break;
default:
exitcode_ = Unknown;
break;
}
}
void operator()()
{
run();
}
string GetResult()
{
string r = to_string(data1_);
r += oper_;
r += to_string(data2_);
r += '=';
r += to_string(result_);
r += "[code: ";
r += to_string(exitcode_);
r += "]";
return r;
}
string GetTask()
{
string r = to_string(data1_);
r += oper_;
r += to_string(data2_);
r += "=?";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
main.cc
#include<iostream>
#include<ctime>
#include"ThreadPool.hpp"
#include"Task.hpp"
using namespace std;
int main()
{
//错误的 :ThreadPool<Task> *tp = new ThreadPool<Task>(5);
//正确的
ThreadPool<Task>::GetInstance()->Start();
srand(time(nullptr)^getpid());
while(true)
{
//1.构建任务
int x = rand() % 10 +1;
usleep(10);
int y = rand() % 5;
char op = opers[rand()%opers.size()];
Task t(x, y, op);//x + y = op
ThreadPool<Task>::GetInstance()->Push(t);
//2.交给线程池处理
cout<< "处理任务" << t.GetTask() <<endl<< "运行结果是:" << t.GetResult() << endl;
sleep(1);
}
return 0;
}
其他常见的各种锁
读者写者问题
读写锁
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的 机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地 降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
读写锁接口
设置读写优先
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);
销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
读写锁案例:
#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
volatile int ticket = 1000;
pthread_rwlock_t rwlock;
void * reader(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_rdlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
void * writer(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_wrlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, --ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
struct ThreadAttr
{
pthread_t tid;
std::string id; };
std::string create_reader_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread reader ", std::ios_base::ate);
oss << i;
return oss.str();
}
std::string create_writer_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread writer ", std::ios_base::ate);
oss << i;
return oss.str();
}
void init_readers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_reader_id(i);
pthread_create(&vec[i].tid, nullptr, reader, (void *)vec[i].id.c_str());
}
}
void init_writers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_writer_id(i);
pthread_create(&vec[i].tid, nullptr, writer, (void *)vec[i].id.c_str());
}
}
void join_threads(std::vector<ThreadAttr> const& vec)
{
// 我们按创建的 逆序 来进行线程的回收
for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it !=
vec.rend(); ++it) {
pthread_t const& tid = it->tid;
pthread_join(tid, nullptr);
}
}
void init_rwlock() {
#if 0 // 写优先
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
pthread_rwlockattr_destroy(&attr);
#else // 读优先,会造成写饥饿
pthread_rwlock_init(&rwlock, nullptr);
#endif
}
int main()
{
// 测试效果不明显的情况下,可以加大 reader_nr
// 但也不能太大,超过一定阈值后系统就调度不了主线程了
const std::size_t reader_nr = 1000;
const std::size_t writer_nr = 2;
std::vector<ThreadAttr> readers(reader_nr);
std::vector<ThreadAttr> writers(writer_nr);
init_rwlock();
init_readers(readers);
init_writers(writers);
join_threads(writers);
join_threads(readers);
pthread_rwlock_destroy(&rwlock);
}
main: main.cpp
g++ -std=c++11 -Wall -Werror $^ -o $@ -lpthread
原文地址:https://blog.csdn.net/2401_83427936/article/details/142843573
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!