『Linux』 第八章 进程间通信
目录
1. 进程间通信介绍
进程间通信就是两个或多个进程进行数据层面的交互。因为进程间独立性的存在,导致进程间通信的成本比较高
1.1 进程间通信目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 数据共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送信息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
而要实现以上的目的,都需要我们去实现进程间的通信
怎么实现进程通信?
进程通信的本质是,让不同的进程看到同一块资源。这个资源,就是特定形式的内存空间。
那这个资源由谁来提供呢? 一般是由操作系统来提供。因为如果由通信的进程来提供。那么,这份资源就只属于提供这份资源的进程。而进程具有独立性,如果允许其他进程访问修改进程的资源,就破坏了进程的独立性。所以进程通信所使用的资源只能由操作系统来提供。
进程访问这份资源空间进行通信。本质是在访问操作系统,而进程是用户的代表,但操作系统不相信用户,因为用户中有坏人。所以,操作系统需要从底层设计,从接口设计好一套逻辑,然后,给我们提供一个个的系统调用接口。我们通过这些接口,来完成资源的创建,使用和释放。
一般操作系统里,会有一个独立的通信模块,负责进程间的通信。他隶属于文件系统,也被称为IPC通信模块。
1.2进程间通信发展
- 管道(基于文件级别的通信方式)
- System V 进程间通信(用于计算机自身内部通信的标准)
- POSIX进程间通信(用于网络通信的标准)
1.3 进程间通信分类
管道 :
- 匿名管道pipe
- 命名管道
System V IPC
- System V 消息队列
- System V 共享内存
- System V 信号量
POSIX IPC
- 消息队列
- 共享内存
- 信息量
- 互斥量
- 条件变量
- 读写锁
那么,我们下面就会依次进行讲解
2. 管道
2.1 什么是管道
管道是Unix中最古老的进程间通信的方式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
而管道分为匿名管道和命名管道
2.2匿名管道
#include <unistd.h>
功能:创建⼀⽆名管道
原型
int pipe(int fd[2]);
参数
fd:⽂件描述符数组,其中fd[0]表⽰读端, fd[1]表⽰写端
返回值:成功返回0,失败返回错误代码
例⼦:从键盘读取数据,写⼊管道,读取管道,写到屏幕
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
int main( void )
{
int fds[2];
char buf[100];
int len;
if ( pipe(fds) == -1 )
perror("make pipe"),exit(1);
// read from stdin
while ( fgets(buf, 100, stdin) ) {
len = strlen(buf);
// write into pipe
if ( write(fds[1], buf, len) != len ) {
perror("write to pipe");
break;
}
memset(buf, 0x00, sizeof(buf));
// read from pipe
if ( (len=read(fds[0], buf, 100)) == -1 ) {
perror("read from pipe");
break;
}
// write to stdout
if ( write(1, buf, len) != len ) {
perror("write to stdout");
break;
}
}
}
2.3 用fork来共享管道原理
2.4 站在文件描述符角度-深度理解管道
在此之前,我们先深度理解下管道的本质,它是怎么创建的,它是如何完成资源共享,数据互通的
匿名管道本质上是由内核管理的一块缓冲区,它本质上是一个没有名字的文件,用于实现进程间通信。这块缓冲区允许具有亲缘关系(如父子进程)的进程通过它进行数据的写入和读取。由于它是内存级的文件,因此数据交换速度较快。但是需要注意的是,匿名管道是半双工的,即同一时间内只能实现单向通信,如果需要双向通信,则需要创建两个管道,可以通过下图理解
那么,知道了管道实质上就是一个只有缓冲区的文件之后,我们也就理解了,为什么pipe函数返回的是文件描述符了,那么我们能否利用父子进程的特性,来实现父子进程之间的通信呢?
首先,我们创建出管道,使得下标为3的文件描述符指向了管道的读端,下标为4的文件描述符指向了管道的写端,然后fork出子进程,由于子进程会继承父进程大部分属性及内容的特性,使得子进程的fd3也指向了管道的读端,fd4指向了管道的写端,此时,我们在将父进程的读端fd3关闭,子进程的写端fd4关闭,至此,我们就是设计出了父进程向子进程通信的方案
那么这里需要注意的是:
- 必须要先创建管道,在folk子进程,否则,子进程不会继承到父进程的文件描述符,这正是这一特性,进程必须具有血缘关系,才能使用匿名管道通信
- 如果我们不关闭父进程的读和子进程的写(针对父向子通信)呢? 首先文件描述符也是资源,所以这会造成fd资源泄露,以及可能会造成误操作
2.5 站在内核角度-管道本质
上图就是抽象出来的file结构对于管道文件的链接。
所以,看待管道,就如同看待文件一样! 管道的使用就和文件一致,迎合了“Linux一切皆文件思想”。
2.5.1缓存区大小
既然管道本质上是一个缓存区,缓冲区大小由操作系统内核定义。大多数Unix和类unix系统中,匿名管道的默认缓冲区大小一般是4KB,至于具体是多少可以 输入 ulimit -a 命令查询
这里 单位是512字节,故缓冲区大小就是8*512 即4096字节,4KB,需要注意的是管道缓冲区大小是系统级的权限,普通用户无法直接修改它。 如果需要更改缓冲区大小,可能需要修改系统内核参数或重新编译内核,这通常需要管理员权限并且有一定的风险。
这里其实还没有结束
担当我们查看man手册的第七章的pipe函数时
我们发现
我们能确定管道是有固定大小的,但好像并不是所谓的4KB,它由版本来决定,我们的是65536byte。接着容量往后看我们会看到一个PIPE_BUF,它的大小至少是512bytes, 在Linux中 大小为4KB,并且在它的解释中说,小于PIPE_BUF,那么读写一定是原子性的,这是什么意思呢?假设我们写入的内容是"hello world!"。在我们写端写入hello的时候,读端不能只读hello。读端只能等写端写完“hello word!”后,再把“hello world!”一起读取。这种操作就做原子性。这对于之后的线程很重要!!
此时,你就明白了,原来,ulimit -a 显示的不是管道的最大容量,而是管道能保证读写原子性的最大容量。
2.6 管道样例
2.6.1 测试管道读写
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #define ERROR_EXIT(m) \ do{ \ perror(m); \ exit(EXIT_FAILURE); \ }while(0) // EXIT_FAILURE 是系统自己定义的宏,用于表示错误返回
int main()
{
int pipefd[2];
if(pipe(pipefd)==-1) // 创建管道
{
ERROR_EXIT("pipe error");
}
pid_t pid=fork(); // 创建管道之后创建子进程
if(pid==-1) ERROR_EXIT("fork error");
//开始关闭多余fd,本次实现为子向父通信,所以关闭子的读,父的写
if(pid==0){
//子
close(pipefd[0]);
write(pipefd[1],"Hello",5);
close(pipefd[1]);
exit(EXIT_SUCCESS);
}
//父
close(pipefd[1]);
char buff[10]={0};
read(pipefd[0],buff,10);
printf("buff= %s\n",buff);
return 0;
}
接下来,我们将其封装一下,形成Writer函数和Reader函数
注:我们之后编程就将采用C/C++混编的方式
2.6.2 Writer和Reader函数的实现
Writer:
在writer的实现中,我们会用到snprintf。这个函数是C语言提供的字符串级别格式的接口。
它的用法很简单,只是比printf函数多了两个参数。
第一个参数:传递地址,告诉它数据往哪里写。
第二个参数,写入大小,也就是写入数据的那段空间的大小,简单来说,就是最多能写多少数据。
后面也就和printf相同了。
编程运行,我们能看到数据正常写入buffer中
我们把测试代码注释掉,改为向文件里写入
编译运行,我们发现,结果竟然输出到了屏幕上!
这是为什么呢?
仔细看代码我们就可以发现,我传入的wfd是1,而我们之前讲过fd的前三个是系统自动分配好的,分别是 键盘——0——stdin, stdout——1——显示器 ,stderr——2——显示器,所以最后我们的结果就输出到了屏幕上
Reader:
v
整体代码:
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <sys/types.h>
#include <string>
#include <cstring>
using namespace std;
const int NUM=1024;
#define ERROR_EXIT(m) \
do{ \
perror(m); \
exit(EXIT_FAILURE); \
}while(0)
// EXIT_FAILURE 是系统自己定义的宏,用于表示错误返回
void Writer(int wfd)
{
string s="hello, I am child";
pid_t pid =getpid();
int number=0;
char buffer[NUM];
while(1)
{
buffer[0]=0; // 字符串清空,这里是为了提醒阅读代码的人,我把这个数组当做一个字符串
snprintf(buffer,sizeof(buffer),"%s-%d_%d\n",s.c_str(),pid,number++);
write(wfd,buffer,strlen(buffer)); // 这里需不需要 strlen +1 不需要,在文件中没有\0 作为字符串结尾的概念
// cout<<buffer<<endl;
sleep(1); //休息一秒,在写入
}
}
void Reader(int rfd)
{
char buffer[NUM];
while(true)
{
buffer[0]=0; // 清楚上次缓存
ssize_t n=read(rfd,buffer,sizeof(buffer)); // 写的话可以用strlen 但是读的话必须得用sizeof了
if(n>0)
{
buffer[n]=0;
cout<<"father get a message {"<< getpid()<<"]#"<<buffer<<endl;
}
}
}
int main()
{
int pipefd[2];
if (pipe(pipefd) == -1) // 创建管道
{
ERROR_EXIT("pipe error");
}
pid_t pid = fork(); // 创建管道之后创建子进程
if (pid == -1)
ERROR_EXIT("fork error");
// 开始关闭多余fd,本次实现为子向父通信,所以关闭子的读,父的写
if (pid == 0)
{
// 子
close(pipefd[0]);
Writer(pipefd[1]);
close(pipefd[1]);
exit(EXIT_SUCCESS);
}
// 父
close(pipefd[1]);
Reader(pipefd[0]);
pid_t rid=waitpid(pid,nullptr,0); // 等待子进程结束
if(rid<0) return 3; //回收失败返回值
close(pipefd[0]);
return 0;
}
编译运行,我们就能看到父进程会收到一条一条的数据。我之所以让number++,就是为了模拟出不同的动态信息。
2.7 管道特征
我们打开脚本后,再次运行程序。
do while:; do ps -ajx | grep process3 | grep -v grep;
sleep 1; echo "############"; done;
我们不难发现,每隔一秒,父进程才打印一次。
可是,Reader函数中并没有sleep函数啊! 为什么,父进程会等待一秒,再打印?
这是因为父进程等待了子进程。我们称这种情况为父子协同。
在后面我们会学到,多线程。一个线程,就是一个执行流。多个执行流访问共享资源,还需要考虑同步互斥。管道这里也是一种共享资源(临界资源),我们让父子进程只能单向通信,某种意义上也是同步互斥。这些我们后面会详细讲解的。
父子协同,同步协同,是为了保证管道数据的安全性。
对于管道中的数据,会出现如下四种情况:
第一种: 读写端正常,管道如果为空,读端就要阻塞起来。这种情况就是我们刚刚程序运行的情况。
第二种:读写端正常,管道如果被写满了,写端就要被阻塞起来。
这个我们可以简单验证一下:
修改Writer函数的代码,让子进程一个一个字节的写入(刚好可以验证下管道的大小),每写一次,number加1,并且打印number的值。
在Reader函数中,增加一个sleep函数,休息50秒,别让父进程读取数据那么快
编译运行,程序会快速打印很多的数字,过了一会,写端写满了数据,就不再写入了。也就是写端阻塞住了。
再过上几十秒,我们才能看到父进程向显示器写入数据。我们写入数据的时候,是一个一个字节写的。可为什么我们读数据的时候,是一次性全部读完呢?这是因为管道是面向字节流的。你想让它切割成一个一个字符去读取,它可不管那么多。在它看来,这些就是一个个的字节。
第三种: 读端正常读,写端关闭,读端就会读到0,表明读到文件(pipe)的结尾,不会被阻塞。
我们简单的验证一下:
我们让Writer函数的while循环在number>5的时候,break。也就是写端写入5次之后关闭。
Reader函数增加一个输出,打印变量n的值。
编译运行,我们就能看到写端关闭后,读端并不会被堵塞住。
第四种:写端正常写入,读端关闭了,操作系统就要杀死正在写入的进程。如何杀掉呢?通过信号。 那么几号信号呢? ——13
我们可以简单验证一下:
我们需要做三步:
第一步: 修改Write函数
把Write函数写入写数据的方式,改成一开始的方式,让子进程不断向管道写入内容。
第二步:修改Reader函数
在Reader函数,增加一个变量cnt,用来计数,充当计数器。当它大于等于5的时候,我们结束循环
第三步:修改main函数Reader调用处往后的代码
我们把关闭读端的动作提前到子进程回收之前。在关闭父进程的读端后,我们打印相应的提示。再让父进程休息5秒后,再回收子进程,这样方便我们观察。回收子进程之后,我们打印子进程退出的信息。最后,在打印提示信息,告诉我们父进程退出了。
编译运行,打开脚本监控
while :; do ps ajx | head -1 && ps axj | grep process4 | grep -v grep; sleep 1; done;
我们就能看到如下运行结果。
我们总结一下管道的特征:
1. 具有血缘关系的进程,才能通过管道进行通信。
2. 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
3.父子进程时会进程协同的,同步与互斥---保护管道文件的数据安全。
4. 管道是面向字节流的。
5. 管道时基于文件的,而文件的生命周期时随进程的!进程结束,文件也就关闭了。
2.8 管道的应用场景
第一个应用场景(指令)
如下面指令使用的管道 和我们刚刚所说的匿名管道是什么关系呢?
cat test.txt | head -10 | tail -5
我简单的写一个指令你就明白了。
指令:
sleep 66666 | sleep 77777 | sleep 88888
脚本:
while :; do ps -ajx | head -1 && ps -ajx | grep sleep | grep -v sleep | sleep 1; done;
我写的这三个sleep指令,虽然不会产生什么数据,但是第一个sleep指令的结果还是会转交给第二个sleep,第二个sleep处理后,再转给第三个sleep。通过监控结果来看,我们不难看出这三个sleep指令,是由三个不同的进程来执行的,并且它们具有同一个父进程,也就是具有血缘关系。具有血缘关系的进程之间的数据交互,这是谁的特征? 不就是我们刚刚学习的管道么?我们三个sleep中使用的管道,很明显没有名字吧! 准确来说,我们指令中用的是匿名管道。
第二个应用场景(如何创建管道与进程 shell)
还记得,我们模拟的微型shell吗? 我们也可以让它支持管道的功能。
怎么实现呢?
1.思路
第一步: 分析输入的命令字符串,统计管道的个数,将命令打散为多个子命令字符串。
第二步: malloc申请空间,pipe先申请多个管道。
第三步: 循环创建多个子进程,针对子进程进行重定向。最开始:输出重定向,1->指定的一个管道的写端,中间,输入输出重定向,0标准输入重定向到上一个管道的读端,1标准输出重定向到下一个管道的写端。
第四步: 分别让不同的子进程执行不同的命令 --- exec* --exec*不会影响该进程曾经打开的文件,不会影响预先设计好的管道重定向。
因此,我们只需要一边创建管道,一边创建进程(最后一个进程单独创建)
并且用vector<pair<int,int>>存储所有的管道的读写端方便后续管道对其进行关闭(因为管道的特点是读/写端没有全都退出,另一端就会一直阻塞等待,如果不关闭读写端的话,父进程回收时就必须要逆序回收)
又因为我们要让父进程回收子进程,所以在用一个vector<int>存储所有子进程的pid,后续还要waitpid回收它们呢
2预处理
在检查是否需要重定向之前,我们需要先检查整个字符串有多少个‘|’命令行管道,并且
[1] 用一个vector<int>保存他们的位置,并且对应位置改为‘\0’,方便后续把每一块指令传递给子进程去处理
[2] 因为第一个指令左侧没有'|',所以我们在初始化vector<int>的时候,可以先存上一个-1,然后分配任务就好分配了。
3.实现
在全局申请一个存放管道位置的vector变量PipePos
4. 完整代码
#include <iostream>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <cstdlib>
#include <sys/wait.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <vector>
const int basesize = 1024;
const int envnums = 64;
const int argcnums = 64;
// 我的系统的环境变量
char *Genv[envnums];
char *Gargv[argcnums];
int Gargc;
// 全局的当前工作路径
char pwd[basesize];
char pwdenv[basesize+5];
char hostname[100];
// 最后一次运行的返回值
int lastcode = 0;
// 全局变量与重定向有关
#define NoneRedir 0
#define InputRedir 1
#define OutputRedir 2
#define AppendRedir 3
int Redir = NoneRedir;
char *FileName = nullptr;
#define Trim(pos) \
do \
{ \
while (isspace(*pos)) \
pos++; \
} while (0);
enum RedirError
{
InReDirError = 1,
OutReDirError = 2,
AppReDirError = 3,
FileNameError = 4
};
std::vector<int> PipePos; // 全局变量,存放管道的位置
void InitEnv()
{
extern char **environ;
int index = 0;
while (environ[index])
{
Genv[index] = new char[strlen(environ[index]) + 1];
strncpy(Genv[index], environ[index], strlen(environ[index]) + 1);
index++;
}
Genv[index] = nullptr;
}
std::string GetPwd()
{
if (nullptr == getcwd(pwd, sizeof(pwd)))
return "None";
snprintf(pwdenv, sizeof(pwdenv), "PWD=%s", pwd);
putenv(pwdenv);
return pwd;
}
std::string GetLastDir()
{
std::string cur = GetPwd();
if (cur == "" || cur == "None")
return cur;
size_t pos = cur.rfind("/");
return cur.substr(pos + 1);
}
std::string GetHostName()
{
gethostname(hostname,sizeof(hostname));
return hostname==nullptr?"None":hostname;
}
std::string GetUserName()
{
std::string name = getenv("USER");
return name.empty() ? "None" : name;
}
std::string MakeCommandLine()
{
// [yang@huawei-cloud myshell]$
char command_line[basesize];
snprintf(command_line, basesize, "[%s@%s %s]&", GetUserName().c_str(), GetHostName().c_str(), GetLastDir().c_str());
return command_line;
}
void PrintCommandLine()
{
printf("%s", MakeCommandLine().c_str());
fflush(stdout);
}
bool GetCommandLine(char command_buffer[], int size)
{
// 我们认为:我们要将用户输入的命令行,当做一个完整的字符串
// "ls -a -l -n"
char *result = fgets(command_buffer, size, stdin);
// printf("%p\n %p\n",result,command_buffer);
if (!result)
{
return false;
}
command_buffer[strlen(command_buffer) - 1] = 0; // 将结尾的回车设为0
if (strlen(command_buffer) == 0)
return false;
return true;
}
void ParseCommand(char command_buffer[])
{
memset(Gargv, 0, sizeof(Gargv)); // 清空之前的代码痕迹
Gargc = 0;
//"ls -l -a -n"
const char *const sep = " ";
Gargv[Gargc++] = strtok(command_buffer, sep); // 将第一个空格前的值切给Gargv[0];
// 将剩下的值依次切给Gargv, 这里=的含义是 将切完的Gargv的值转为bool 进行比较
while ((bool)(Gargv[Gargc++] = strtok(nullptr, sep)))
;
Gargc--;
}
void ResetCommandLine()
{
memset(Gargv, 0, sizeof(Gargv));
Gargc = 0;
Redir = 0;
FileName = nullptr;
}
void ParseRedir(char command_buffer[], int len)
{
int end = len;
while (end >= 0)
{
if (command_buffer[end] == '<')
{
Redir = InputRedir;
command_buffer[end] = 0;
FileName = &command_buffer[end] + 1;
Trim(FileName); // 文件名去空格
break;
}
else if (command_buffer[end] == '>')
{
if (command_buffer[end - 1] == '>')
{
Redir = AppendRedir;
command_buffer[end - 1] = 0;
command_buffer[end] = 0;
FileName = &command_buffer[end] + 1;
Trim(FileName);
break;
}
else
{
Redir = OutputRedir;
command_buffer[end] = 0;
FileName = &command_buffer[end] + 1;
Trim(FileName);
break;
}
}
else
end--;
}
}
void ParseCommandLine(char command_buffer[], int len)
{
ResetCommandLine();
ParseRedir(command_buffer, len);
ParseCommand(command_buffer);
}
void DoRedir()
{
if (Redir == NoneRedir)
return;
if (FileName == nullptr)
exit(FileNameError);
if (Redir == InputRedir)
{
int fd = open(FileName, O_RDONLY);
if (fd < 0)
{
exit(InReDirError);
}
else
dup2(fd, 0);
}
else if (Redir == OutputRedir)
{
int fd = open(FileName, O_CREAT | O_WRONLY | O_TRUNC, 0666);
if (fd < 0)
exit(OutReDirError);
else
dup2(fd, 1);
}
else if (Redir == AppendRedir)
{
int fd = open(FileName, O_CREAT | O_WRONLY | O_APPEND, 0666);
if (fd < 0)
exit(AppReDirError);
else
dup2(fd, 1);
}
}
bool ExecuteCommand()
{
// 让子进程执行
pid_t id = fork();
if (id < 0)
return false;
if (id == 0)
{
// 子进程部分
// 重定向
DoRedir();
// 1.执行命令
execvpe(Gargv[0], Gargv, Genv);
// execve(Gargv[0],Gargv,Genv);
// 2.退出 正常是不会返回的,如果执行了下面的就说明出错了,所以退出值为1
exit(1);
}
int status = 0;
pid_t rid = waitpid(id, &status, 0);
if (rid > 0)
{
if (WIFEXITED(status))
{
lastcode = WEXITSTATUS(status);
}
else
{
lastcode = 100;
}
return true;
}
return false;
}
void AddEnv(const char *item)
{
int index = 0;
while (Genv[index])
{
index++;
}
Genv[index] = new char[strlen(item) + 1];
strncpy(Genv[index], item, strlen(item) + 1);
Genv[++index] = nullptr;
}
bool CheckAndExecBuilCommand()
{
if (strcmp(Gargv[0], "cd") == 0)
{
if (Gargc == 2)
{
chdir(Gargv[1]);
lastcode = 0;
}
else
{
lastcode = 1;
}
return true;
}
else if (strcmp(Gargv[0], "export") == 0)
{
// export 也是内建命令 因为子进程改变了,父进程不会改变
if (Gargc == 2)
{
AddEnv(Gargv[1]);
lastcode = 0;
}
else
{
lastcode = 1;
}
return true;
}
else if (strcmp(Gargv[0], "env") == 0)
{
for (int i = 0; Genv[i]; i++)
{
printf("%s\n", Genv[i]);
}
lastcode = 0;
return true;
}
else if (strcmp(Gargv[0], "echo") == 0)
{
if (Gargc == 2)
{
// echo $
// echo $PATH
// echo hello
if (Gargv[1][0] == '$')
{
if (Gargv[1][1] == '?')
{
printf("%d\n", lastcode);
lastcode = 0;
}
else
{
char *temp = &Gargv[1][1];
printf("%s\n", getenv(temp));
lastcode = 0;
}
}
else
{
printf("%s\n", Gargv[1]);
lastcode = 0;
}
}
else
{
lastcode = 3;
}
return true;
}
return false;
}
void ParseAndExecCommand(char command_buffer[])
{
ParseCommandLine(command_buffer, strlen(command_buffer)); // 3.分析命令
if (!CheckAndExecBuilCommand()) // 内建指令,因为shell有部分指令只能又自己调用,而这类指令我们称之为内建指,这里就是检查输入是否为内建指令
ExecuteCommand(); // 4. 执行命令
}
int CheckPipe(char command_buffer[])
{
PipePos.clear(); // 清空之前的管道位置
PipePos.push_back(-1); // 管道位置初始化
for (int i = 0; i < strlen(command_buffer); i++)
{
if (command_buffer[i] == '|')
{
PipePos.push_back(i); // 记录管道位置
command_buffer[i] = '\0'; // 将管道符号替换为空格,以便后续分割
}
}
return PipePos.size(); // 返回管道的数量
}
void Execpipe(char command_buffer[])
{
std::vector<int> child_id;
std::vector<std::pair<int, int>> pipe_fd; // 管道的读写文件描述符
// v[i].first: i号管道的读文件描述符
// v[i].second: i号管道的写文件描述符
for (int i = 0; i < PipePos.size() - 1; i++) // 创建sz个进程,sz-1个管道
{
int pipefd[2];
pipe(pipefd);
pid_t pid = fork();
if (pid == 0) // 子进程
{
close(pipefd[0]); // 关闭读端
int prev_fd_sz = pipe_fd.size();
// 因为新开的子进程会继承父进程的文件描述符,而父进程的文件描述符是不关闭的,所以每一次新开子进程,都需要将前面的不符合的文件描述符关闭
for (int j = 0; j < prev_fd_sz; j++)
{
if (j < prev_fd_sz - 1)
{
close(pipe_fd[j].first); // 将前面的读端关闭,除了最近的一个管道
}
close(pipe_fd[j].second); // 将前面的写端关闭
}
dup2(pipefd[1], 1); // 将写端重定向到管道的写端
if (!pipe_fd.empty()) dup2(pipe_fd.back().first, 0); // 将读端重定向到上一个管道的写端
// 发送命令给子进程执行
ParseAndExecCommand(command_buffer + PipePos[i] + 1);
// 这里 pos[0]=-1,作用也就在这里.
exit(0);
}
else // 父进程
{
pipe_fd.push_back({pipefd[0],pipefd[1]}); // 父进程记录管道的读写文件描述符
child_id.push_back(pid);
}
}
// 最后一个子进程
pid_t pid = fork();
if (pid == 0) // 子进程
{
close(pipe_fd.back().second); // 关闭写端
int prev_fd_sz = pipe_fd.size();
for (int j = 0; j < prev_fd_sz; j++)
{
if (j < prev_fd_sz - 1)
{
close(pipe_fd[j].first); // 将前面的读端关闭,除了最近的一个管道
}
close(pipe_fd[j].second); // 将前面的写端关闭
}
dup2(pipe_fd.back().first, 0); // 将读端重定向到上一个管道的读端
// 发送命令给子进程执行
ParseAndExecCommand(command_buffer + PipePos.back() + 1);
exit(0);
}
child_id.push_back(pid);
//父进程关闭所有的读写端
for(auto& e:pipe_fd)
{
close(e.first);
close(e.second);
}
// 等待子进程结束
for (int i = 0; i < child_id.size(); i++)
{
int status = 0;
pid_t rid = waitpid(child_id[i], &status, 0);
if (rid > 0)
{
if (WIFEXITED(status))
{
lastcode = WEXITSTATUS(status);
}
else
{
lastcode = 100;
}
if(lastcode!=0) printf("warning: last command is %d\n", lastcode);
else printf("waitpid success\n");
}
}
}
int main()
{
InitEnv();
char command_buffer[basesize];
while (true)
{
PrintCommandLine(); // 1. 命令行提示符
// command_buffer -> output
if (!GetCommandLine(command_buffer, basesize)) // 2.获取用户命令
{
continue;
}
int sz = CheckPipe(command_buffer); // 3.检查是否有管道
// 4. 如果没有管道,则直接执行命令
if (sz == 1)
ParseAndExecCommand(command_buffer);
// 5. 如果有管道,则创建子进程,并将命令分割为多个命令,然后创建多个子进程,并将命令发送给子进程执行
else
Execpipe(command_buffer);
}
return 0;
}
第三个应用场景 (进程池)
使用管道实现一个简易版本的进程池。
大家都知道在一些干旱的地区,十分缺水。村里的人每次取水,都要到十里外取水。每次取水都要走很长的路,成本很高。于是,就有人想。我们可不可以建一个蓄水池,然后,请一辆车,把水运到我们的蓄水池中。这样每次取水就直接到蓄水池取水就可以了。这就是池化技术。我们每次执行指令都需要创建进程。调用系统接口fork也是有成本的,所以,我们可以提前创建好一批进程,用数组保存起来,等指令来了,直接分配给进程进行执行即可。
进程池的用途,我们了解了,那么该如何实现呢?
思路:
我们可以创建一批子进程和一批管道。让父进程通过管道的写端写内容,子进程再从管道的读端读内容(这里也可以将子进程的标准输入改为管道的读端),这样就可以实现一个任务派发的过程了。然后,子进程拿到相应的指令,执行就可以了。
这时就又衍生两个问题:
1. 什么是任务? —— 任务码,int
2. 怎么派发任务 —— 子进程采用轮询、随机、历史任务数等方法分发任务,要求各个子进程任务量差不多,负载均衡。
后期,我们也可以通过网络来传递任务...
实现:
我们重新创建五个文件
一个是Makefile,便于我们编译程序
ProcessPool.hpp 用于模拟实现进程池,Task.hpp,用来模拟我们的任务,Channel.hpp 用于封装管道,Main.cc主体程序运行(.hpp是c++的头文件),
这里也可以将多个文件写到一块,分开写是为了解耦合,实现模块化编程,提高代码复用度。
第一步;
父进程和每个子进程之间都要有一个管道,我们该如何把它们管理起来呢,是不是先描述,在组织! 我们需要封装一个channel 结构体来描述父子进程之间通信的管道信息,再用一个vector 组织起来使用
封装时,需要注意,使用宏定义,防止自定义头文件重复包含
Channel的封装代码:
#ifndef __CHANNEL_HPP__ // 防止重复包含
#define __CHANNEL_HPP__
#include<iostream>
#include <unistd.h>
#include <string>
// 定义一个管道类 先描述
class Channel
{
public:
Channel(int wfd, pid_t who) : _wfd(wfd), _who(who)
{
//Channel-3-1234
_name="Channel-"+std::to_string(wfd)+"-"+std::to_string(who);
}
void Send(int cmd)//cmd 任务码
{
::write(_wfd, &cmd, sizeof(cmd)); // 写入任务码
}
void close()
{
::close(_wfd);
}
std::string getName() const { return _name; }
int getWfd() const { return _wfd; }
pid_t getWho() const { return _who; }
~Channel() {}
private:
// 因为是进程池,我们管道只需要记录谁给的任务,即父进程的信息
int _wfd; // 写端文件描述符
std::string _name; // 管道名字
pid_t _who; // 读端子进程的pid
};
#endif // __CHANNEL_HPP__
第二步:
初始化进程池,让父子进程之间建立管道
Task.cpp的子进程初始任务这里,我们先做个简单测试,看管道的建立情况
Main.cc 中的内容
编译运行,父子进程之间的通信管道建立成功
第三步:实现多个任务
我们可以设计多个任务,类似于任务池,然后父进程随机挑选任务发给子进程
我们再将Worker(原始任务,slaver主从任务) 更新执行任务部分
第四步:
实现子进程分配任务?
那么,我们应该如何分配呢? 我们可以采用随机数的方式或者轮转遍历的方式,这两种方式都能做到让每个进程都被调度到,而不是某一个进程一直在执行任务,其他进程闲着。我们称这种情况为负载均衡。
再更新下Main.cc ,编译运行
至此,我们就实现了进程池的创建与初始化以及分配任务
最后,我们实现进程池的回收
这里需要注意,如果建立子进程的时候,历史管道的wdf子进程没有关掉,那么这里回收进程池就必须得从后向前回收,或者写成两个循环
我们前面说过,关闭写端,读端并不会阻塞住,会继续执行,而我们在主从(slaver)任务中写了,如果当子进程从管道中读到0,则自动退出。
完整代码如下:
Main.cc
#include "ProcessPool.hpp"
#include "Task.hpp"
int main()
{
ProcessPool p(5);
p.InitProcessPool();
p.DispathTask(5);
p.QuitProcessPool();
return 0;
}
Channel.hpp
#ifndef __CHANNEL_HPP__ // 防止重复包含
#define __CHANNEL_HPP__
#include<iostream>
#include <unistd.h>
#include <string>
// 定义一个管道类 先描述
class Channel
{
public:
Channel(int wfd, pid_t who) : _wfd(wfd), _who(who)
{
//Channel-3-1234
//记录管道 通过3可以写到pid为1234的子进程中
_name="Channel-"+std::to_string(wfd)+"-"+std::to_string(who);
}
void Send(int cmd) //cmd 任务码
{
::write(_wfd, &cmd, sizeof(cmd)); // 写入任务码
}
void close()
{
::close(_wfd);
}
std::string getName() const { return _name; }
int getWfd() const { return _wfd; }
pid_t getWho() const { return _who; }
~Channel() {}
private:
// 因为是进程池,我们管道只需要记录谁给的任务,即父进程的信息
int _wfd; // 写端文件描述符
std::string _name; // 管道名字
pid_t _who; // 读端子进程的pid
};
#endif // __CHANNEL_HPP__
Task.hpp
#ifndef TASK_HPP
#define TASK_HPP
#include <unistd.h>
#include <iostream>
#include <vector>
#include <functional>
using task_t =std::function<void()>;
class TaskPool
{
public:
TaskPool()
{
srand(time(nullptr));
_Tasks.push_back([](){
std::cout<<"sub process[" << getpid() <<" ] 执⾏访问数据库的任务\n";
});
_Tasks.push_back([](){
std::cout<<"sub process[" << getpid() <<" ] 执⾏url解析\n";
});
_Tasks.push_back([](){
std::cout<<"sub process[" << getpid() <<" ] 执⾏加密任务\n";
});
_Tasks.push_back([](){
std::cout<<"sub process[" << getpid() <<" ] 执⾏数据持续化任务\n";
});
_TaskNum=_Tasks.size();
}
// 实现随机选择任务码
int SelectTasks()
{
return rand()%_TaskNum;
}
void Execut(unsigned long number )
{
if(number>=_TaskNum || number < 0 )
return ;
_Tasks[number]();
}
size_t GetTaskNum()
{
return _TaskNum;
}
void AddTask(task_t task)
{
_Tasks.push_back(task);
}
~TaskPool(){}
private:
std::vector<task_t> _Tasks;
unsigned long _TaskNum;
};
TaskPool taskPool;
void Worker() //也可叫slaver函数,主从函数,用于子进程接受父进程任务执行 //初始任务
{
while(true)
{
int cmd=0; //任务码
int n=::read(0,&cmd,sizeof(cmd));
if(n==sizeof(cmd))
{
//执行任务
taskPool.Execut(cmd);
}
else if(n==0)
{
std::cout<<"pid:"<<getpid()<<" exit"<<std::endl;
break;
}
}
}
#endif // TASK_HPP
ProcessPool.hpp
#ifndef PROCESSPOOL_HPP
#define PROCESSPOOL_HPP
#include <iostream>
#include <vector>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <functional>
#include "Channel.hpp"
#include "Task.hpp"
typedef std::function<void()> Work_t; // 回调函数
enum
{
OK = 0,
UsageError,
PipeError,
ForkError
};
class ProcessPool
{
public:
ProcessPool(int n, Work_t work) : _processNum(n), _work(work) {}
ProcessPool(int n) : _processNum(n) {}
// 初始化进程池
int InitProcessPool()
{
// 1. 创建指定个数个进程
for (int i = 0; i < _processNum; i++)
{
// 1. 先要有管道
int pipefd[2];
if (pipe(pipefd) < 0)
return PipeError;
// 2. 创建子进程
pid_t pid = fork();
if (pid < 0)
return ForkError;
// 3. 建立通信管道
if (pid == 0)
{
// 子进程
// 关闭历史wfd
std::cout << getpid() << " child close history fd: ";
for (auto x : _channels)
{
// std::cout<<x.getWfd()<<' ';
x.close();
}
std::cout << std::endl;
// std::cout<<"debug"<<' '<<pipefd[1]<<std::endl;
close(pipefd[1]); // 关闭当前写端
// 修改读端为标准输入
dup2(pipefd[0], 0);
_work(); // 执行原始动作
exit(0);
}
else
{
// 父进程
close(pipefd[0]); // 关闭当前读端
_channels.push_back(Channel(pipefd[1], pid)); // 保存管道
}
}
return OK;
}
void DispathTask(unsigned long nums)
{
// 这里采用轮询遍历法
int ir = 0;
// 2.分发任务
while (nums--)
{
// int taskid=0;
// // std::cin>>taskid;
// std::cout<<"收到任务:"<<taskid<<std::endl;
// if(taskid>=taskPool.GetTaskNum() || taskid<0)
// {
// std::cout<<taskPool.GetTaskNum()<<std::endl;
// // 任务池中没有该任务,跳过
// std::cout<<"任务池中没有该任务,跳过"<<std::endl;
// nums++;
// continue;
// }
// a.选择一个任务,整数
int taskid = taskPool.SelectTasks();
// b.选择一个子进程channel
Channel &curr = _channels[ir++];
// 打印消息
std::cout << "#############################################\n";
std::cout << getpid() << " dispatch task " << taskid << " to " << curr.getName() << std::endl;
std::cout << "任务还剩:" << nums << std::endl;
std::cout << "#############################################\n";
// c.发送任务
curr.Send(taskid);
sleep(1);
}
}
// 进程池的回收
void QuitProcessPool()
{
// Version 3:
for ( auto x : _channels)
{
x.close(); // 关闭所有管道的写端
pid_t rid = waitpid(x.getWho(), NULL, 0); // 等待所有子进程结束,回收子进程
if (rid > 0)
{
std::cout << "child" << rid << "wait... success" << std::endl;
}
}
// Version 2: 关闭所有管道的写端的同时,等待子进程结束,回收子进程
// for(auto x:_channels)
// 这里需要注意如果我们建立子进程的时候,没有关闭之前的历史wfd,那么这里会导致前面管道的wfd被后面多个子进程继承,此时必须从后往前关闭wfd
// for(int i=_channels.size()-1;i>=0;i--)
// {
// _channels[i].close(); // 关闭所有管道的写端
// pid_t rid=waitpid(_channels[i].getWho(),NULL,0); // 等待所有子进程结束,回收子进程
// if(rid>0)
// {
// std::cout<<"child"<<rid<<"wait... success"<<std::endl;
// }
// }
// Version 1: 等待所有子进程结束,回收子进程
// for(auto x:_channels)
// {
// x.close(); // 关闭所有管道的写端
// }
// for(auto x:_channels)
// {
// pid_t rid=waitpid(x.getWho(),NULL,0); // 等待所有子进程结束,回收子进程
// if(rid>0)
// {
// std::cout<<"child"<<rid<<"wait... success"<<std::endl;
// }
// }
}
private:
std::vector<Channel> _channels;
int _processNum; // 进程数
Work_t _work = Worker; // 执行的任务函数
};
#endif // PROCESSPOOL_HPP
2.9. 命名管道
- 管道的应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
- 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,而它也经常被称为命名管道。
- 命名管道是一种特殊的文件。
2.9.1 创建一个命令管道
命令管道可以从命令行上创建,命令行方法是使用下面这个命令
$ mkfifo filename
命令管道也可以从程序里创建,相关函数有:
创建命名管道:
int main()
{
mkfifo("myfifo",0644);
return 0;
}
编译运行之后就会产生一个管道文件
2.9.2 匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfifo函数创建,打开用open
- FIFO(命名管道) 与pipe(匿名管道)之间唯一的区别在于它们创建与打开的方式不同,一旦这些工作完成之后,它们具有相同的语义。
命名管道的使用
我们向命名管道中写入一句"hello world",然后,进程就阻塞住了。
不用慌,这是因为我们向管道中写入数据,但是管道中的数据没有被读走,所以进程陷入阻塞
这时,我们再打开一个窗口,使用cat查看管道,将管道中的数据读走,这样另一个窗口也就恢复了
2.9.3 命名管道的原理
思考一个问题,如果两个进程,打开同一个文件的时候,在内核中,操作系统会打开几个文件?
答案是一个,两个进程会有自己相应的struct file结构体,但操作系统只会打开一个文件,被两个struct file指向。
两个不同的进程的strcut file结构体,指向同一个文件,这不就做到了两个进程看到了同一份资源了么?
那么我们是如何知道两个进程打开的是同一个文件呢?
通过判断两个进程是不是打开同一路径下同一个文件名的文件,因为路径+文件名具有唯一性。
路径+文件名具有唯一性,那我直接让两个进程打开同一个普通文件,不就看到同一份资源了吗?何必大费周章弄一个管道文件,
当然没那么简单,普通文件会把数据刷新到磁盘上,进程如果从普通文件读取,又要跑到磁盘上拿。这样是不是太麻烦了? 我们只是想让两个进程之间进行通信,文件的内容并不需要刷新到磁盘上。这样的文件,我们称之为内存级文件。
其实,命名管道底层的逻辑和匿名管道时一致的。那它为什么叫命名管道呢? 因为它是真真切切的是有路径,有名字的。
2.9.4 使用命名管道进行通信
第一步:我们创建一个client.cc 表示客户端,创建一个server.ccb表示服务端,创建一个common.hpp 用来存放客户端和服务器都会使用的功能
第二步,服务器用mkfifo函数创建管道
编译运行,管道就被建立好了
那么,使用完管道之后,我们希望可以直接删除掉管道文件,而删除文件,就需要用到函数unlink函数
现在有了管道了,下一步进行通信
第三步:让客户端向管道中写,服务器从管道读
server.cc
client.cc
这里,其实我们也可以把代码写的更加优雅一点,我们可以创建一个FIFO类,而FIFO类的初始化就是管道的创建,销毁就是管道的删除。
至此,命名管道的通信,我们就算是完成了。那么我们能否利用命名管道模拟实现一个匿名管道文章那样的进程池呢? 当然可以了
但是过程其实和匿名管道的写法差不多,只是创建的差异,实现上也更加的简单 ,所以我们这里也就不实现了,直接开始下一个章节
3. system V共享内存
共享内存是最快的IPC(进程通信)形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递将不在涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据
3.1共享内存示意图
3.2 共享内存数据结构
struct shmid_ds
{
struct ipc_perm shm_perm; /* operation perms */
int shm_segsz; /* size of segment
(bytes) */
__kernel_time_t shm_atime; /* last attach time */
__kernel_time_t shm_dtime; /* last detach time */
__kernel_time_t shm_ctime; /* last change time */
__kernel_ipc_pid_t shm_cpid; /* pid of creator */
__kernel_ipc_pid_t shm_lpid; /* pid of last operator */
unsigned short shm_nattch; /* no. of current
attaches */
unsigned short shm_unused; /* compatibility */
void shm_unused2;
/ ditto - used by DIPC * /
void shm_unused3;
/ unused * /
};
struct ipc_perm
{
__key_t __key;/* Key. */
__uid_t uid;/* Owner's user ID. */
__gid_t gid;/* Owner's group ID. */
__uid_t cuid;/* Creator's user ID. */
__gid_t cgid;/* Creator's group ID. */
__mode_t mode;/* Read/write permission. */
unsigned short int __seq;/* Sequence number. */
unsigned short int __pad2;
__syscall_ulong_t __glibc_reserved1;
__syscall_ulong_t __glibc_reserved2;
};
3.3 ipcs指令
ipcs用于显示消息队列、信号量和共享内存段等信息
- -q:仅显示消息队列的信息。
- -m:仅显示共享内存段的信息。
- -s:仅显示信号量的信息。
- -a:显示所有信息,是默认的输出信息。
- -b:显示每个 IPC 对象的最大允许大小。
- -c:显示创建者的用户名和组名。
- -o:显示未完成的使用信息。
- -p:显示进程号信息。
- -t:显示时间信息。
3.4 共享内存原理
我们知道,进程间通信的本质是,先让不同的进程看到同一份资源。
那么假设现在有A和B两个进程,操作系统在物理内存,申请了一段空间。然后,把这段空间通过页表,映射到A进程的共享区。映射到共享区后,会占用一段空间,把这段空间的虚拟地址的起始地址,返回给进程A。进程A就可以看到这段空间了。我们用同样的方式,映射到进程B的共享区。这样A和B进程就能够看到同一份资源了。这这就是共享内存的原理。
建立共享内存,我们可以理解成以下三步:
第一步: 申请内存
第二步:挂接到进程地址空间
第三步:返回虚拟地址的起始地址给进程
那么我们要如何释放共享内存呢?
第一步:解除进程和共享内存的关联
第二步:释放共享内存
在建立和释放共享内存的过程中,进程是直接做的吗?不是,进程作为需求方,告诉执行方操作系统,自己的需求。执行方根据需求方的需求进行操作
而申请共享内存的需求只有一个吗? 在操作系统中,会存在很多的共享内存。这么多的共享内存,操作系统需不需要管理起来?怎么管理? 先描述,在组织!
操作系统中,会存在一个内核数据结构,用来描述共享内存。
3.5进程间通信代码实现:
进程通信,实际上主要就是给两个进程解决两个问题,
一: 如何创建以及如何使用
二:如何获取以及如何使用
建立共享内存,我们用到的函数叫shmget。
key: 创建的共享内存段的名字
size:共享内存大小
shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的。
我们主要用到的是IPC_CREATE 和IPC_EXCL (不单独使用)
当取值为IPC_CREATE :共享内存不存在,创建并返回;共享内存已存在,获取并返回。
当取值为IPC_CREATE | IPC_EXCL :共享内存不存在,创建并返回;共享内存已存在,出错返回。
返回值:成功返回一个非负整数,即该共享内存段的标识码;失败返回-1
对于key,我们需要有5点认识
1. key是一个数字,它能让不同进程标识同一块共享内存
2.第一个进程通过key创建共享内存,第二个往后的进程,只要拿着同一个key就可以和第一个进程看到同一个共享内存了。
3.对于一个已经创建好的共享内存,key在哪里? ——key在共享内存的描述对象中,即内核结构体
4. 第一次创建共享内存的时候,必须要有一个key且不会重复
5.key,类似路径,具有唯一性
那么,基于以上五点,这个key到底该怎么来?
这个key必须由用户决定,因为如果由内核自己生成的话,就会导致这个key传不出来,进程间还是无法看到同一个共享内存。
那么如果key由用户决定的话,就会引出另一个问题,你怎么保证key的唯一性,怎么保证不同进程看到的是同一个共享内存呢?
只要我们保证了key的唯一性,并且维护一个key对应一个共享内存,那么就可以保证不同进程在拥有同一个key的情况下,看到的是同一个共享内存,而key的唯一性我们是由通过一个算法来保证的 ,而这个也已经被封装到库中了,我们可以在代码中通过库来调取使用
ftok函数的第一个参数是路径名,第二个参数是一个整数,随便设,ftok实际就是对pathname和proj_id进行了数值计算,并不会查这个key有没有被使用并且pathname唯一,proj_id唯一也无法保证最后的key的值是唯一的,只是概率很小了。
所以当shmget建立共享内存出错,主要会有两中原因:
第一种:共享内存不足
第二种:构建的key唯一性不足
下面我们就来实现一下把
用一个common.hpp 放共同有的值
ProcessA.cc
这里我们介绍一个查看共享内存的指令
ipcs -m
接下来,我们编译运行,通过ipc -m 指令查看,确实存在一个和程序打印的shmid和key同样的共享内存
但是,我们发现在ProcessA进程推出之后,共享内存也并没有释放,在此运行代码之后,确报错了
这说明了什么?
说明了共享内存的生命周期是随内核的! 用户不主动关闭,共享内存会一直存在。除非内核重启或用户主动释放
ipcrm -m <shmid>,其中<shmid>是要删除的共享内存段的标识符。
但是奇特的是,当我们再次重新运行代码之后,结果却发生变化了
shmid由0变为了1,这说明了什么?
这说明shmid 本质上也是一个数组下标。
那么,既然shmid和key都能标识共享内存,那它们的区别是什么? 为什么还要有shmid来标识共享内存?
shmid vs key
shmid: 只给用户使用的一个标识shm的标示符 类似于fd,FIFE*
key: 只作为内核中,区分shm唯一性的标示符,不作为用户管理shm的 id值 类似于 文件描述符表中,struct file的地址。
接下来,我们来简单分析下 ipcs -m 所显示的属性分别是什么?
这里,key,shmid就不用多说了,owner就是谁创建的它,perms 是权限,也就是文件的权限,我们那里没有设,所以是0,bytes 是共享内存的大小。nattach是关联数,也就是有几个进程和这个共享内存相关联,最后一个status 显然就是这个共享内存的状态了。
如果说我们想给共享内存,设置权限,只需要在shmget函数中的第三个参数中,加上权限即可。
共享内存只需要由一方来创建,另一方直接获取使用就可以了
接下来,我们再优化下接口
共享内存能创建了,下一步,就是将共享内存挂接到进程的共享区,让进程看到。
那么,该如何挂接呢? 使用shmat函数
shmat的第二个参数就是告诉系统把共享内存挂到共享区的哪一个位置,我们直接传nullptr即可,让系统自己来决定就可以了。第三个参数,它的两个可能取值是SHM_RND和SHM_RDONLY,我们正常就写0就行。返回值是系统实际挂接共享内存的虚拟地址的起始地址。
- shmaddr为NULL,核⼼⾃动选择⼀个地址
- shmaddr不为NULL且shmflg⽆SHM_RND标记,则以shmaddr为连接地址。
- shmaddr不为NULL且shmflg设置了SHM_RND标记,则连接的地址会⾃动向下调整为SHMLBA的整数倍。
- 公式:shmaddr - (shmaddr % SHMLBA)
- shmflg=SHM_RDONLY,表⽰连接操作⽤来只读共享内存
而shmat下面的shmdt函数,是用来解除进程和共享内存关联的。
注意:将共享内存段与当前进程脱离不等于删除共享内存段。
它的用法很简单,只需要把shmat的返回值也就是挂载共享内存的虚拟地址空间的起始地址传给它就可以了。
那我们如何直接释放物理内存呢? 用函数shmctl
shmctl的第二个参数是选项,告诉系统你是要释放共享内存,还是要修改共享内存的大小等等。第三个参数是获取共享内存的属性,拷贝出来进行查看
cmd:
好了,方法了解清楚了,接下来把代码补充一下
ProcessA.cc
编译运行,并启动监控脚本,我们就能看到共享内存属性nattach的变化,由0变为1,再由1变为0
下面,我们将通信部份加上,并且完善ProcessB.cc
ProcessA.cc
ProcessB.cc
编译运行,ProcessA和ProcessB就可以正常通信了
ProcessB.cc源文件中,我们通信的方式,可以更加简便一些。一旦有共享内存挂载到自己的地址空间中,你可以直接把它当作是自己的空间使用
如果我们想要查看,共享内存的属性,我们可以通过shmctl函数操作
共享内存的内核数据结构本节开头处有
编译之后,就可以看到属性被打印出来了。
3.6 共享内存的特性
我们总结一下,共享内存的特性。可以简单的理解为三点:
1. 共享内存没有同步互斥之类的保护机制
2. 共享内存是所有进程间通信中,速度最快的。原因在于,共享内存的拷贝次数比较少
3. 共享内存内部的数据,由用户自己维护
3.7 共享内存加上命名管道实现互斥机制
共享内存,没有互斥机制。可我们希望它具有安全性,被保护起来,如果客户端没有输入,服务器就阻塞等到我们的信息,而不是一直向显示器打印,所以我们需要给他加上互斥机制。
那我们怎么加上互斥机制呢?我们还没学过互斥锁,但是我们学过命名管道呀!命名管道具有互斥机制,我们可以使用命名管道,来通知对方,传输信息使用共享内存。
搞清了思路,实现起来就很简单了。
我们可以使用上一节,我们所写的创建销毁管道的代码,直接复制到Common.hpp即可
Common.hpp
可以在Common.hpp中,直接写两个函数,然后在ProcessA,ProcessB中直接使用
ProcessA.cc
ProcessB.cc
编译运行,之后就可以实现同步互斥通信了
4. System V 消息队列
- 消息队列的本质:一个进程向另外一个进程发送有类别数据块的方法
- 每个数据块都认为是有一个类型,接收者进程接收的数据块可以有不同的类型值
- 特性方面
- IPC资源必须删除,否则不会自动清除,除非重启,所以System V IPC 资源的生命周期随内核
4.1 消息队列的原理
假设有A和B两个进程,通过消息队列进行通信。如果A进程给B进程发信息,A进程给B进程发送的消息,就会挂到消息队列中。如果说,B进程给A进程发送信息,B进程发送的信息也会被挂到消息队列中。这样一来,消息队列中就有了一个一个的节点,我们称之为数据块。
A和B进程都往队列里放数据,那我们怎么去做区分呢?所以,在数据块中会有一个字段用来标记类型,是A的数据块还是B的数据块。
消息队列的原理,可以总结为两点:
- 必须让不同进程看到同一个队列
- 允许不同进程,向内核中发送带类型的数据块
进程间通信的本质是看到同一份资源,这份资源以什么样的方式让我们看到,决定了这种方式是上面,是管道或者说文件缓存区,是共享内存,还是消息队列
操作系统中,会有很多的进程使用消息队列进行通信,消息队列不止一个。操作系统需不需要将它们管理起来?当然。那怎么管理呢?先描述,再组织!
接下来,我们来看看和消息队列相关的函数吧
4.2 消息队列相关函数
创建消息队列
msgget 函数用来创建消息队列
它的第一个参数有没有点眼熟,这不就是我们创建共享内存时用的key嘛。它们两者是相同的,都是通过ftok函数获取的。
msgget的第二个参数,是填选项,和创建共享内存的方式相同。创建新的消息队列传IPC_CREAT | IPC_EXCL,如果是存在则获取,不存在则创建,只传IPC_CREAT即可。
如果消息队列我们创建好了,但是不想用了,想把它释放掉,那怎么办?
释放消息队列
msgctl函数释放消息队列,第一个参数就是msgget创建消息队列后的返回值,也就是这个消息队列的id功能和key的区别都类似于共享内存那谈的。第二个参数,这里和上面共享内存那的值都一样,填IPC_RMID 即可删除
第三个参数,是描述消息队列使用的结构体。如果只想要删除的话,这里可以填nullptr即可,
如果想要获取,第二个参数可以传IPC_STAT就可以获取了,msg结构体如下图
- msg_cbyes表示队列的总字节数是多少
- msg_qnum表示队列中有多少个数据块
- msg_qbytes 表示队列中一个数据块的最大字节数
- msg_lspid表示最近发送数据块的进程pid
- msg_lrpid表示最近收到数据块的进程pid
发送消息
发送消息用到的函数是msgsnd。第一个参数,向指定的消息队列发送消息。第四个参数设置为0,以阻塞方式发送消息。第二个参数,数据块的起始地址。第三个参数,数据块的大小。
下面这个就是我们将来要发送的数据块,名字随便取。mtype是数据块的类型,必须大于0。mtext是消息的内容。
接收消息
接收消息,用到函数msgrcv。前三个,就不必多说了,第四个参数,传数据块的类型,告诉它,你要接受的数据,是A类型还是B类型的。第五个参数,和msgsnd函数一样,设置为0即可。
使用 ipcs -q 指令,可以查看已创建的消息队列
用 ipcrm -q msgid 指令,也可以删除相应的消息队列
5. System V 信号量
5.1 信号量相关函数
主要用于同步和互斥的
申请信号量使用semget函数,semget函数可以一次申请多个信号量。
这里注意:多个信号量和信号量是不同的概念
第一个参数和第三个参数不用说和上面信号队列和共享内存功能一样,第二个参数,表示指定信号量集中信号量的数量。当创建新的信号量集时,需要指定信号量的个数;如果是获取已存在的信号量集,则该参数值应该与信号量集创建时指定的信号量数量一致。
信号量的控制使用semctl函数
第一个和第三个参数同之前的一致,这里第二个参数表示要操作的信号量中的某个具体信号量的编号,编号从0开始。当使用一些对整个信号量集操作的命令时,semnum 可以设置为0
semctl还可以初始化信号量。第三个参数传IPC_SET,然后,第四个参数传一个自己定义的联合体,联合体格式如下:
下面这个是描述信号量属性的结构体
至此,我们发现信号量、共享内存和消息队列,它们的相似度很高,都是在同一套体系下运行的,结构体中的第一个成员都是 struct ipc_perm XXX_perm。 而struct ipc_perm 这样的字段里面包含的内容都是一样的
信号量的具体使用,我们后面配合代码进行讲解
下面,我们先补充下几点概念
5.2 并发编程,概念铺垫
- 多个执行流(进程),能看到的同一份公共资源 :共享资源
- 被保护起来的资源 叫做临界资源
- 常见保护的方式 :互斥与同步
- 任何时刻,只允许一个执行流访问资源,叫做互斥
- 多个执行流,访问临界资源的时候,具有一定的顺序性,叫做同步
- 系统中某些资源一次只允许一个进程使用,称这样的资源为临界资源或互斥资源
- 在进程中涉及到互斥资源的程序叫做临界区。
- 你写的代码 = 访问临界资源的代码(临界区)+ 不访问临界资源的代码(非临界区)
- 所谓的对公共资源进行保护,本质是对访问共享资源的代码进行保护
这里,我们举几个例子说明下
A进程向共享内存写入数据,hello world。A进程刚写完hello,还没写完。但B进程就进来读取了,只读取了一部分。
我们是想给B进程发送hello world,可B进程只拿走了一部分,导致发送方和接受方数据不一致的现象,我们称之为数据不一致问题。
解决数据访问的方式是加锁,加锁的具体细节我们后面的文章细说。这里我们只需要理解,加锁,意味着互斥访问,任何时刻,只允许一个执行流访问共享资源,这种状态我们称之为互斥。
而共享的,任何时刻只允许一个执行流访问(就是访问执行的代码)的资源,我们称之为临界资源。临界资源,一般都是内存空间。比如说管道,管道具有互斥功能,你访问管道,是在访问文件缓冲区,文件缓冲区的本质就是内存空间。
共享内存是不是临界资源呢?不是,虽然它是共享的,但是它没有做保护,保证互斥。
假设现在有100行代码,有5行代码会对共享资源进行访问。我们把这5行访问临界资源的代码,称之为临界区。
思考一个问题:如果说多个进程或多个线程向显示器打印内容,会出现什么现象?
显示器上显示的信息是错乱的,混乱的,和命令混合在一起。为什么?多个进程或线程向显示器打印,首先得看到同一个显示器文件。多个进程向同一个文件写入,也就是向文件缓冲区写入数据。文件缓冲区,此时作为共享资源,并没有做保护,你一句,我一句,当然会出现混乱啦!
5.3 理解信号量
信号量,是英译过来的名字,也有人翻译为信号灯。信号量本质上是一把计数器,类似于int cnt =n, 但两者并不等价。
举个例子,放映厅中有100个座位,对应的有100张电影票。放映厅老板能不能把票加到101,当然不能,座位总共就100张。当我们去看电影的时候,先做什么?是不是先买票,预定一个位置。每卖出一张票,电影票的数量就减一。当电影票的数量减为0,也就没有座位了。
座位是放映厅的一种资源,买票的本质是对座位资源的预定。每卖一张票,资源的数量就减一。当资源的数量减为0,也就意味着资源已被申请完毕了。
临界资源其实就相当于例子中的放映厅中的座位,临界资源的数量可能是n,我们需要用一个计数器 int cnt =15, 来统计临界资源的数量。如果有资源被申请了,我们就cnt--,当cnt减为0,就表示资源被申请完了,再有执行流来申请,就不给你。
程序员把这个计数器,称之为信号量。
如果说放映厅只有一个座位,那么我们只需要一个为1的计数器即可。
只有一个座位,对应只有一张票,一张票只有一个人能抢到,只有一个人能到放映厅里看电影。看电影期间,只能有一个执行流访问临界资源,这不就是互斥吗?
我们把值只能为0,1两态的计数器,称之为二元信号量。二元信号量本质上就是一个锁。
为什么计数器为1呢? 因为资源的数量为1。也可以理解为,我们把临界资源当成一个整体,整体申请,整体释放。
这里有四个点需要明确
1. 申请计数器成功,就表示我就具有访问资源的权限
2. 申请了计数器资源,我当前访问了我要的资源吗?没有,申请了计数器资源是对资源的预定机制
3.计数器可以有效保证进入共享资源的执行流的数量
4.所以,每一个执行流,想要访问共享资源中的一部分的时候,不是直接访问,而是先申请计数器资源。看电影先买票!
接着思考一个问题:要访问临界资源,先要申请信号量计数器资源,信号量计数器不也是共享资源吗?
申请信号量计数器资源,要进行的cnt--。可cnt--并不是安全的,在C语言上,这是一条语句,但变成汇编,它至少需要3条汇编语句。
1. cnt变量的内存,内存->cpu 寄存器
2. cpu内进行--操作
3. 将计算结果写回cnt变量的内存位置
在执行以上这三条汇编语句的时候,进程可能随时被切换。这部分是有问题的,后面的文件会展开讲解。
保护别人,首先得保证自己的安全!
申请信号量,本质是对计数器--,我们称之为P操作
释放资源,释放信号量,本质是对计数器进行++操作,我们称之为V操作
要想保证字节的安全,就要保证申请和释放的PV操作都是原子的。
什么是原子的? 就是做一件事,只会处于两种状态,要么不做,要么做完,没有 “正在做” 这一状态。
说了这么多,简单总结,其实就四句话:
1. 信号量本质是一把计数器,执行PV操作且均是原子的。
2. 执行流申请资源,必须先申请信号量资源,得到信号量子厚,才能访问临界资源!
3. 信号量值1,0两态的,二元信号量,就是互斥功能
4. 申请信号量的本质:是对临界资源的预定机制!!
基于这些知识,我们就可以看研究信号量如何设置了
5.4 信号量设置
设置信号量用函数semop,它的第一个参数是信号量标识符
第二个参数,是需要传一个结构体,这个结构体有如下三个成员,
第一个成员sem_num,我们一次可以申请多个信号量,你要操作哪一个信号量,通过这个变量告诉它。如果我们只申请了一个信号量,那么填0就可以了,这和数组下表是一样的。
第二个成员sem_op,表示对信号量的操作类型,这里我们填{-1,0,1} 这三种值就行,填-1表示要--,也就是执行P操作(获取资源),这里如果信号量的值在减完之后小于0,那么调用进程将会堵塞住,1则是++,也就是执行V操作(释放资源)0,则表示测试操作,检查信号量的值是否为0,若不为0,则调用进程将会阻塞,知道信号量为0或信号量集被删除
第三个成员sem_flg 表示操作的标志,其值主要有两个
IPC_NOWAIT:如果设置了该标志,上面所说的操作无法执行,导致进程需要阻塞时,semop函数将立刻返回,进程不会陷入阻塞,返回值为-1,并且设置errno 为EAGAIN
SEM_UNDO:如果设置了该标志,当进程终止时,系统会自动撤销该进程对信号量的操作,以防止进程异常终止时导致的信号量操作未完成而引起的死锁等问题。
第三个参数,表示 sops数组中元素的个数,即需要执行操作的数量
信号量是System V 接口中最复杂的,所以实践部分,我们将会在多线程部分操作说明。
思考一个问题: 信号量凭什么是进程间通信的一种?
主要基于一下两点:
- 通信不仅仅是为了我们进行传输数据,也可以是互相协同
- 要协同,本质也是通信,信号量首先就要被所有通信的进程看到!!
6. System V 实现IPC的本质
接下来我们探究下 System V是如何实现IPC的? 和管道为什么不同呢?
这里,我们会从两个角度进行探讨
1. 应用角度,看IPC属性
在上一小节中,我们也发现了,共享内存,消息队列,信号量它们在IPC实现上有高度的相似,所采用的函数都是换汤不换药,底层所采用的结构体ipc_perm更是相同的,
所以我们可以推测: 在OS层面上,IPC是同类资源!
2. 从内核角度,看IPC结构
IPC资源一定是全局的资源,能够被所有进程看到,这样才能保证,不同的进程访问到的是同一个资源
那么全局的IPC资源在内核中是如何进行管理的呢?
在内核中会有一个全局的结构体类型 ipc_ids,里面会存放一个entries指针, 这个entries指针会指向一个ipc_id_ary的表,这个表主要存放一个柔性指针数组p,而这个柔性指针数组是 kern_ipc_perm指针类型的,kern_ipc_perm看的会不会很眼熟?
这个类型就是共享内存,信号量,消息队列结构体的第一个元素,所以,ipc_id_ary表中的柔性指针数组p的元素,指向的就是每一个共享内存、信号量、消息队列结构体的起始位置,通过这个指针数组指向所有IPC资源,这也就说明数组下标实际上就是之前的id,也就是XXXget的返回值,消息队列、共享内存、信号量,用key区分唯一性
p指针数组在存储时,可以将类型强转存储,使用时也可以强转类型使用,系统可以通过这个指针访问所有以ipc_perm类型的基础设计的结构体,这一特性很像什么?
多态!!!
没错,这就是用C语言实现多态的方法,ipc_perm为基类,msg_perm,,shmid_perm,semid_perm 为子类
那么现在还有个问题,我们在访问IPC资源的时候,P指针数组元素会发生强转,那我们怎么知道是信号量,共享内存,还是消息队列访问的元素呢?
其实在老的Linux版本中,这里的解决方案是,会有三个结构体类型为ipc_ids的全局变量,这个变量分别用于存储信号量、共享内存、消息队列,但是它们的entries指针会指向同一个表,这样也就维护了唯一性。
在新的Linux版本中呢,这里则是改为了一个类型为ipc_ids的结构体数组,大小为3,再将信号量、共享内存、消息队列,分别宏定义映射下标存储
在使用上,我们发现,信号量和消息队列都是采用的系统调用接口,来完成操作的,只有共享内存是直接通过地址使用的,
那么,为什么共享内存是可以直接使用的呢?
我们知道在进程结构体中有指向内存的结构体mm_struct指针,而其实在mm_struct 中会存在多个指向vm_area_struct的指针,vm_area_struct就是用于虚拟映射的结构体,主要用于共享内存,动态库的映射。
vm_area_struct 中存在 struct file* vm_file 变量用于文件属性映射,标识该区域所映射的文件是什么,还存在 vm_start 和vm_end 这两个用于标识物理地址对应虚拟地址的映射,这样内存块也就有了。
而且这种使用方式是不占用文件描述符的,因为这片区域不是用户打开的,所以不用把文件描述符给你,是通过文件映射找到的。
那么,我们能否自己打开一个文件,不使用文件描述符,不使用open和write函数了,而是用类似于这种文件映射 vm_start,vm_end 的方式找到呢?
当然可以,这里来介绍一个接口 mmap,这个其实也是共享内存,不过是 POSIX 表标准的共享内存接口,底层原理还是将 vm_file指向我们打开的文件,vm_start 和vm_end 映射物理地址
mmap映射文件的简单demo
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#define FILE_SIZE 4096
int main() {
int fd;
void *map_ptr;
char *file_path = "example_file.txt";
// 打开文件,如果文件不存在则创建
fd = open(file_path, O_RDWR | O_CREAT, 0644);
if (fd == -1) {
perror("open");
return 1;
}
// 扩展文件大小
if (ftruncate(fd, FILE_SIZE) == -1) {
perror("ftruncate");
close(fd);
return 1;
}
// 使用 mmap 映射文件
map_ptr = mmap(NULL, FILE_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (map_ptr == MAP_FAILED) {
perror("mmap");
close(fd);
return 1;
}
// 对映射区域进行操作
// 例如,写入一些数据
char *data = "Hello, mmap!";
sprintf((char *)map_ptr, "%s", data);
// 确保数据被写入文件
if (msync(map_ptr, FILE_SIZE, MS_SYNC) == -1) {
perror("msync");
munmap(map_ptr, FILE_SIZE);
close(fd);
return 1;
}
// 打印映射区域的内容
printf("Mapped file content: %s\n", (char *)map_ptr);
// 解除映射
if (munmap(map_ptr, FILE_SIZE) == -1) {
perror("munmap");
close(fd);
return 1;
}
// 关闭文件
if (close(fd) == -1) {
perror("close");
return 1;
}
return 0;
}
原文地址:https://blog.csdn.net/m0_54443558/article/details/144594830
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!