自学内容网 自学内容网

实现epoll事件的两种模型(ET/LT)、epoll反应堆模型

前置知识:

多进程/线程并发服务器、多路I/O转接服务器的简单实现-CSDN博客

1. 事件模型

EPOLL事件有两种模型:

  • Edge Triggered (ET) 边缘触发只有数据到来才触发,不管缓存区中是否还有数据。
  • Level Triggered (LT) 水平触发只要有数据都会触发。

思考如下步骤:

1. 假定我们已经把一个用来从管道中读取数据的文件描述符(rfd)添加到epoll描述符。

2. 管道的另一端写入了2KB的数据

3. 调用epoll_wait,并且它会返回rfd,说明它已经准备好读取操作

4. 读取1KB的数据

5. 调用epoll_wait……

在这个过程中,有两种工作模式:

1.1 ET模式

ET模式即Edge Triggered工作模式。

如果我们在第1步将rfd添加到epoll描述符的时候使用了EPOLLET标志(epoll_ctl函数参2),那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。

1) 基于非阻塞文件句柄

  • 就比实例一中父进程中read改为readn要求一次读500个字节,父进程pfd[0]设置的是ET模式,子进程向pfd[1]管道写了498个字节,父进程epoll_wait触发,readn只读取498未满500,就一直阻塞等待,而没法重新进入下一轮循环触发epoll_wait(如果子进程在父进程读取498个数据的时候及时又补上了2个字节那就不会导致readn一直在那阻塞,又或者把传入readn的pfd[0]通过fcntl改为非阻塞,即使未到达readn要求的字节数也不会阻塞)

2) 只有当read或者write返回EAGAIN(非阻塞读,暂时无数据)时才需要挂起、等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。

缓冲区剩余未处理完的数据不会导致epoll_wait返回。(新的事件满足才会被触发)

struct epoll_event event;

event.events = EPOLLIN|EPOLLET;

1.2 LT模式

LT模式即Level Triggered工作模式。

与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。

缓冲区剩余未处理完的数据会导致epoll_wait返回

比较

LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block socket。(fcntl可以设置fd的阻塞或非阻塞)在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

ET(edge-triggered):ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once).

1.3 实例一

基于管道epoll ET触发模式

#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>

#define MAXLINE 10

int main(int argc, char* argv[])
{
    int efd, i;
    int pfd[2];
    pid_t pid;
    char buf[MAXLINE], ch = 'a';

    pipe(pfd);
    pid = fork();
    if (pid == 0) {   // 子写
        close(pfd[0]);   // 子进程这本关闭读端
        while (1) {
            //aaaa\n
            for (i = 0; i < MAXLINE / 2; i++)
                buf[i] = ch;
            buf[i - 1] = '\n';
            ch++;
            //bbbb\n
            for (; i < MAXLINE; i++)
                buf[i] = ch;
            buf[i - 1] = '\n';
            ch++;
            //aaaa\nbbbb\n
            write(pfd[1], buf, sizeof(buf));
            sleep(2);
        }
        close(pfd[1]);
    }
    else if (pid > 0) {   //父进程 读
        struct epoll_event event;
        struct epoll_event resevent[10];
        int res, len;
        close(pfd[1]);

        efd = epoll_create(10);
        /* event.events = EPOLLIN; */
        event.events = EPOLLIN | EPOLLET;  //读事件 | ET边缘触发事件        
        /* ET 边沿触发 ,默认是水平触发 */
        event.data.fd = pfd[0];
        epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);  //管道读端的文件描述符添加进efd树中

        while (1) {
            res = epoll_wait(efd, resevent, 10, -1);  //阻塞等待事件的触发并返回事件触发的已经就绪的文件描述符
            printf("res %d\n", res);  //打印是否为pfd[0]
            if (resevent[0].data.fd == pfd[0]) {
                len = read(pfd[0], buf, MAXLINE / 2);  
                //只读取一半出来 -->管道中还剩下bbbb\n
                //由于对pfd[0]设置的是ET触发,因此进入下一轮while循环时,如果子进程没写东西进管道,epoll_wait是不会触发的
                //也就是ET模式下缓冲区残留的数据是不会触发epoll_wait
                //如果子进程又写入cccc\n,那么epoll_wait就会被触发,但此时处理的却是上一次残留在缓冲区的bbbb\n
                write(STDOUT_FILENO, buf, len);
            }
        }
        readn的情况下:
        /*
        int flag = fcntl(pfd[0],F_GETEL);   //获取pfd[0]的信息属性
        flag |= O_NONBLOCK;  //设置为非阻塞
        fcntl(connfd,F_SETFL,flag);  //修改pfd[0]属性
         while (1) {
            res = epoll_wait(efd, resevent, 10, -1);  //阻塞等待事件的触发并返回事件触发的已经就绪的文件描述符
            printf("res %d\n", res);  //打印是否为pfd[0]
            if (resevent[0].data.fd == pfd[0]) {

                len = readn(pfd[0], buf, 500);  //要求一次读取500字节,肯定不够,必会阻塞
                //父进程中read改为readn要求一次读500个字节,父进程pfd[0]设置的是ET模式,
                //子进程向pfd[1]管道写了498个字节,父进程epoll_wait触发,readn只读取498未满500,
                //就一直阻塞等待,而没法重新进入下一轮循环触发epoll_wait(如果子进程在父进程读取498个数据的时候
                //及时又补上了2个字节那就不会导致readn一直在那阻塞,又或者把传入readn的pfd[0]通过fcntl改为非阻塞,
                //即使未到达readn要求的字节数也不会阻塞)
                //因此要把pfd[0]设置为非阻塞  

                write(STDOUT_FILENO, buf, len);
            }
        }
        */                                   
        close(pfd[0]);
        close(efd);
    }
    else {
        perror("fork");
        exit(-1);
    }
    return 0;
}

如果不重新产生设置fd时的某个事件,上一次处理完数据后缓冲区中残留的数据在下一轮循环中不会被epoll_wait触发,fd不会被传出进行处理

如果触发了设置fd的某个事件,就会去处理上次fd残留在缓冲区的数据,而这次触发的fd的某个事件产生在缓冲区的数据,如果不设置好数据互通的处理方式,就只能留到等下次触发了fd事件时才能被处理

1.4 实例二

基于网络C/S模型的epoll ET触发模式

server:

/* server.c */
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>

#define MAXLINE 10
#define SERV_PORT 8080

int main(void)
{
    struct sockaddr_in servaddr, cliaddr;
    socklen_t cliaddr_len;
    int listenfd, connfd;
    char buf[MAXLINE];
    char str[INET_ADDRSTRLEN];
    int i, efd;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(SERV_PORT);

    bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));

    listen(listenfd, 20);

    struct epoll_event event;
    struct epoll_event resevent[10];
    int res, len;
    efd = epoll_create(10);
    event.events = EPOLLIN | EPOLLET;  /* ET 边沿触发 ,默认是水平触发 */

    printf("Accepting connections ...\n");
    cliaddr_len = sizeof(cliaddr);
    connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &cliaddr_len);
    printf("received from %s at PORT %d\n",
           inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
           ntohs(cliaddr.sin_port));
    //设置句柄为非阻塞
    int flag = fcntl(connfd,F_GETEL);   //获取connfd的信息属性
    flag |= O_NONBLOCK;  //设置为非阻塞
    fcntl(connfd,F_SETFL,flag);  //修改connfd属性

    event.data.fd = connfd;
    epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);

    while (1) {  //只监听一个connfd
        res = epoll_wait(efd, resevent, 10, -1);
        printf("res %d\n", res);
        if (resevent[0].data.fd == connfd) {
            len = read(connfd, buf, MAXLINE / 2);
            write(STDOUT_FILENO, buf, len);
        }     
    }
    return 0;
}

client: 

/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>

#define MAXLINE 10
#define SERV_PORT 8080

int main(int argc, char* argv[])
{
    struct sockaddr_in servaddr;
    char buf[MAXLINE];
    int sockfd, i;
    char ch = 'a';

    sockfd = socket(AF_INET, SOCK_STREAM, 0);

    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
    servaddr.sin_port = htons(SERV_PORT);

    connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));

    while (1) {
        for (i = 0; i < MAXLINE / 2; i++)
            buf[i] = ch;
        buf[i - 1] = '\n';
        ch++;

        for (; i < MAXLINE; i++)
            buf[i] = ch;
        buf[i - 1] = '\n';
        ch++;

        write(sockfd, buf, sizeof(buf));
        sleep(10);
    }
    Close(sockfd);
    return 0;
}

1.5 实例三

基于网络C/S非阻塞模型的epoll ET触发模式

server:

/* server.c */
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>
#define MAXLINE 10
#define SERV_PORT 8080

int main(void)
{
    struct sockaddr_in servaddr, cliaddr;
    socklen_t cliaddr_len;
    int listenfd, connfd;
    char buf[MAXLINE];
    char str[INET_ADDRSTRLEN];
    int i, efd;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(SERV_PORT);

    bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));

    listen(listenfd, 20);

    struct epoll_event event;
    struct epoll_event resevent[10];
    int res, len;
    efd = epoll_create(10);
    event.events = EPOLLIN | EPOLLET;  /* ET 边沿触发 ,默认是水平触发 */

    printf("Accepting connections ...\n");
    cliaddr_len = sizeof(cliaddr);
    connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &cliaddr_len);
    printf("received from %s at PORT %d\n",
           inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
           ntohs(cliaddr.sin_port));
    //设置句柄为非阻塞
    int flag = fcntl(connfd,F_GETEL);   //获取connfd的信息属性
    flag |= O_NONBLOCK;  //设置为非阻塞
    fcntl(connfd,F_SETFL,flag);  //修改connfd属性

    event.data.fd = connfd;
    epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);

    while (1) {  //只监听一个connfd
        res = epoll_wait(efd, resevent, 10, -1);
        printf("res %d\n", res);
        if (resevent[0].data.fd == connfd) {
            len = read(connfd, buf, MAXLINE / 2);
            write(STDOUT_FILENO, buf, len);
        }     
    }
    return 0;
}

client:

/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>

#define MAXLINE 10
#define SERV_PORT 8080

int main(int argc, char *argv[])
{
    struct sockaddr_in servaddr;
    char buf[MAXLINE];
    int sockfd, i;
    char ch = 'a';
    sockfd = socket(AF_INET, SOCK_STREAM, 0);

    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
    servaddr.sin_port = htons(SERV_PORT);

    connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

    while (1) {
        for (i = 0; i < MAXLINE/2; i++)
            buf[i] = ch;
        buf[i-1] = '\n';
        ch++;

        for (; i < MAXLINE; i++)
            buf[i] = ch;
        buf[i-1] = '\n';
        ch++;

        write(sockfd, buf, sizeof(buf));
        sleep(10);
    }
    Close(sockfd);
    return 0;
}

2. epoll反应堆模型

epoll ET模式 + 非阻塞 + void *ptr

2.1 void *ptr

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):
参四结构体的成员data的成员变量

void* ptr在一定程度可以代替int fd(共用体):

struct evt{
   int fd;
   void* (func)(int a);
}* ptr;
void *ptr设置为struct evt*类型的结构体指针,包含了fd已经回调函数func,而这个回调函数又称为epoll的反应堆

反应堆既要监听读事件也要监听写事件的,当读事件触发时,读取完后想写给客户端,但在网络中是不能随便写的,需要判断条件是否充足,比如滑动窗口win缓冲区大小是否足够写等,因此需要让void *ptr设置回调函数去处理写事件

2.2 实例 -- recv

#include<stdio.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<arpa/inet.h>
#include<fcntl.h>
#include<errno.h>
#include<string.h>
#include<stdlib.h>
#include<time.h>
#define MAX_EVENTS 1024  //监听上限
#define BIFLEN 4096
#define SERV_PORT 8080  //端口号

void recvdata(int fd, int events, void *arg);   
void senddata(int fd, int events, void *arg);

/*描述就绪文件描述符相关信息*/
struct myevent_s{
    int fd;  //要监听的文件描述符
    int events;  //对应的监听事件
    void *arg;  //泛型参数 --struct myevent_s *,arg指向整个结构体的首地址,也就是存着strcut myevent_s
    void(*call_back)(int fd, int events, void *arg);   //回调函数 -- 反应堆,lfd和cfd不同
    //lfd的回调函数是acceptconn -- 用处理客户端的请求链接
    //cfd的回调函数recvdata和sendata,用于处理读事件和写事件 -- recvdata读取完后会修改回调函数为sendata用于写数据到缓冲区
    int status;  //是否在监听:1 --在红黑树上(监听); 0 --不在(不监听)
    char buf[BBUFLEN]; //缓冲区
    int len;  
    long last_active;  //记录每次加入红黑树 g_efd 的事件值
};  //void *ptr,文件描述符的信息以及反应堆call_back回调函数

int g_efd;  //全局变量,保存epoll_create返回的文件描述舒服
struct myevent_s g_events[MAX_EVENTS+1];  //自定义结构体类型数组,+1 --> listen fd
//最后一位存储lfd,前面的都是存储cfd的void *ptr -- struct myevent_s

/*将结构体myevent_s成员变量初始化 -- 参1,是数组g_events的成员*/
void evenset(struct myevent_s *ev, int fd, void(*call_back)(int,int,void*), void *arg);  //arg和ev实际上是一回事
{  //把监听套接字lfd放在了g_events数组的最后一位
    ev->fd = fd;
    ev->call_bakc = call_back;
    ev->events = 0;
    ev->arg = arg; //泛型参数,arg指向ev自己的整体,其结构体数据存在g_events数组当中
    ev->status = 0;  //是否监听
    meset(ev->buf, 0, sizeof(ev->buf));  //清空缓冲区,最好是添加个判断
    ev->len = 0;  
    ev-last_active = time(NULL);  //调用even函数事件
    return;
}  //ptr中的内容 -- 反应堆的初始化

/*对myevent_s数组中新到来的epoll_data_t data中的cfd设置新的挂点
    添加到epoll监听的红黑树,是不是新到来的通过status判断*/
//evenradd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
void eventadd(int efd, int events, struct myevent_s *ev)
{
    //设置树的挂点:epoll_data_t data和events事件
    struct epoll_event epv = {0, {0}};
    int op;
    epv.data.ptr = ev;  //void *ptr:ev和ptr指向的相同的空间,给挂点的data中的ptr赋值上 
    epv.events = ev->events = events;  //EPOLLIN 或者 EPOLLOUT
    //ev和ptr指向相同的空间,
    if(ev->status == 0){  //查看是否在红黑树g_efd上,0表示不在
        op = EPOLL_CTL_ADD;  //将其加入红黑树,并且将status置为1    
        ev->status = 1;
    }
    if(epoll_ctl(efd,op,ev->fd,&epv)< 0)   //实际添加/修改
        printf("event add failed [fd=%d],events[%d]\n", ev->fd, events);
    else
        printf("event add ok [fd=%d],op=%d, events[%X]\n", ev->fd, op, events);
    return ;

}  //创建节点,将反应堆模型赋值到epoll_event节点上(data.ptr),然后再将节点挂到树上

/*创建 socket,初始化lfd,放在可g_events数组的最后一位 */
void initlistensocket(int efd, short port)
{
    struct sockaddr_in sin;
    int lfd=Socket(AF_INET,SOCK STREAM, 0);
    fcntL(lfd,F_SETFL,O_NONBLOCK); //将socket设为非阻塞

    memset(&sin,0,sizeof(sin));  //bzero(&sin, sizeof(sin))
    sin.sin_family = AF_INET;
    sin.sin_addr.s addr = INADDR_ANY;
    sin.sin_port = htons(port);

    bind(lfd,(struct sockaddr *)&sin, sizeof(sin));
    listen(lfd,20);

    /* 33行 void eventset(struct myevent_s *ev, int fd, void (*call back)(int, int, void *), void *arg); */
    eventset(&g_events[MAX_EVENTS],lfd, acceptconn, &g_events[MAX_EVENTS]);
    //初始化lfd的epoll_data_t data中的void *ptr的成员变量,回调函数是acceptconn
    //g_events在定义时候MAX_EVENTS+1了,因此这里的参1其实就是数组最后一位的地址
    //acception是lfd的回调函数(反应堆),用于处理客户端的请求连接和初始化cfd的epoll_data_t data中的void *ptr的成员变量以及添加到红黑树

    /*void eventadd(int efd, int events, struct myevent s *ev)*/
    eventadd(efd,EPOLLIN,&g_events[IMAX EVENTS]);
    //将客户端挂到g_efd红黑树上

    return;
}

/*从epoll 监听的 红黑树中删除一个 文件描述符*/
void eventdel(int efd, struct myevent_s *ev){   //ev指向g_events数组对应的元素
    struct epoll_event epv ={0,{0}};
    if(ev->status != 1)  //不在红黑树上
        return ;
    epv.data.ptr = ev;  //ev的地址赋值给了epv.data.ptr
    //epv.data.ptr = NULL;
    //对ptr指向的内容进行清空,也就是将该cfd在g_events数组中对应的元素进行清除。这里省略
    //也可以不用,因为下面对元素的status赋值为0,其它为清除的等有新的客户端前来也会被覆盖掉
    ev->status =0;  //修改状态

    epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv);//从红黑树 efd 上将 ev->fd 摘除

    return ;
}

/*当有文件描述符就绪,epoll返回,调用该函数 与客户端建立链接*/
//有客户端请求连接,在main函数中epoll_wait传出的数组events包含lfd,通过调用回调函数进行处理
void acceptconn(int lfd,int events, void *arg){
    struct sockaddr in cin;
    socklen tlen =sizeof(cin);
    int cfd, i;

    if((cfd=accept(lfd,(struct sockaddr *)&cin,&len))== -1){
        if(errnO !=EAGAIN && errno != EINTR){
            /*暂时不做出错处理 */
        }
        printf("%s:accept,%s\n",__func__,strerror(errno));  //__func__当前函数的名字
        return;
    }
    do {
        for(i=0;i<MAX EVENTS; i++)  //从全局数组g_events中找一个空闲元素
            if(gevents[i].status ==0)  //类似于select中设计的client[]数组中找值为-1的元素
                break;  //跳出 for

    if(i ==MAX EVENTS){
        printf("%s: max connect limit[%d]\n",__funC__,MAX_EVENTS);
        break;   //跳出do while(0)不执行后续代码
    }

    int flag = 0;
    if((flag =fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0 ){  //将cfd也设置为非阴寒
        printf("%s:fcntl nonblocking failed, %s\n",__func__,strerror(errno));
        break;
    }

    /*给cfd设置一个 myevent_s 结构体,回调函数 设置为 recvdata */
    eventset(&g_events[i],cfd,recvdata,&g_events[i]);  //参四让arg指向了g_event[i],再赋值给ptr中的arg指向g_events[i],相当于指向了自己ptr
    eventadd(g_efd,EPOLLIN,&g_events[i]);  //将cfd添加到红黑树g_efd中,监听读事件

    } while(0);
    printf("new connect[%s:%d][time:%ld], pos[%d]\n"inet_ntoa(cin.sin addr),ntohs(cin.sin port),g events[i].last active, i);
    return;
}

//有读事件 -> 读取完后 -> cfd从g_efd上摘除下来:eventdel -> 更改回调函数为sendata
//-> 改为写事件 -> 重新挂到红黑树上 -> sendata将新的数据写给客户端
void recvdata(int fd, int events, void *arg){
    struct myevent_s *ev =(struct myevent_s *)arg;  //指向g_events数组当中的对应的数据的
    int len;

    //读文件描述符,数据存入myevent_s成员buff中
    len =recv(fd,ev->buf, sizeof(ev->buf),0);
    //代替read用于网络通信当中,第四个参数1传0表示实际读,传MSG_PEEK则是模拟读

    eventdel(g_efd, ev);//将该节点从红黑树上摘除,并对应在g_events中的元素的stauts状态改为0,表示未监听
    if(len >0){
        ev->len = len;
        ev->buf[len]='\0';  //手动添加字符串结束标记
        printf("c[%d]:%s\n",fd,ev->buf);
        eventset(ev,fd,senddata,ev)//设置该 cfd 对应的回调函数为 senddata
        eventadd(g_efd, EPOLLOUT, ev);  //将cfd加入红黑树g_efd中,监听其写事件

        }else if(len ==0){
            close(ev->fd);
            /*ev-g_events 地址相减得到偏移元素位置 */
            printf("[fd=%d] pos[%ld],closed\n",fd, ev-g_events);
         }else {
             close(ev->fd);
             printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
         }
    return;
}


void senddata(int fd, int events, void *arg){
    struct myevent_s *ev =(struct myevent_s *)arg;
    int len;

    len=send(fd,ev->buf,ev->len,0); //直接将数据 回写给客户端。未作处理
    eventdel(g_efd, ev); //从红黑树g_efd中移除

    if(len >0){
        printf("send[fd=%d],[%d]%s\n",fd, len, ev->buf);
        eventset(ev,fd,recvdata,ev); //将该fd的 回调函数改为 recvdata
        eventadd(g_efd, EPOLLIN,ev);//从新添加到红黑树上,设为监听读事件

    } else {
    //关闭链接
    close(ev->fd);
    printf("send[fd=%d] error %s\n",fd, strerror(errno));
    return ;
}


int main(int argc,char*argv[])
    unsigned short port = SERV PORT;
    if(argc ==2)
        port = atoi(argv[1]);    //使用用户指定端口.如未指定,用默认端口
    g_efd = epoll_create(MAX EVENTS+1);  //创建红黑树,返回给全局 g_efd
    if(g_efd <=0)
        printf("create efd in %s err %s\n",_func_,strerror(errno));

    initlistensocket(g_efd, port); //初始化监听socket

    struct epoll_event events[MAX EVENTS+1];  //保存已经满足就绪事件的文件描述符数组
    printf("server running:port[%d]\n",port)

    int checkbos =0,i;
    while(1){
    /*超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */

        long now = time(NULL);  //当前时间
        for(i=0;i<100;i+,checkpos++){  //一次循环检测100个。使用checkpos控制检测对象
            if(checkpoS == MAX EVENTS)
                checkpos =0;
            if(g_events[checkpos].status != 1)  //不在红黑树 g_efd 上
            continue;
        //客户端在一定时间内不活跃就将其踢掉
        long duration =nowg_events[checkpos].last_active;  //客户端不活跃的世间
        if(duration>=60){
             close(g_events[checkpos].fd);//关闭与该客户端链接
             pfintf("[fd=%d] timeout\n",g_events[checkpos].fd);
             eventdel(g_efd, &g_events[checkpos]);//将该客户端 从红黑树 g_efd移除
        }
        /*监听红黑树g_efd,将满足的事件的文件描述符加至events数组中,1秒没有事件满足,返回 0*/
        int nfd = epoll_wait(g_efd,events, MAX EVENTS+1,1000);  //1000是毫秒单位 -- 1秒
        //客户端的请求连接处理省略,添加if语句判断event中有没有lfd然后调用回调函数即可
        //调用回调函数acceptconn(int lfd,int events, void *arg):
        //arg传:(强转)(event[0].data.ptr)->arg,将自己传给回调函数
        //这么传其实和传g_events[最大] -- lfd放在了最后一位没啥区别,不过如果是调用cfd的回调函数就不太方便,因为在g_events数组中的位置不好确定
        //events[i]知道了客户端,通过其data.ptr->arg就可以知道在g_events中的数据,传给回调函数就能对挂点进行处理和删除更改为写监听

        if(nfd < 0){
            printf("epoll_wait error, exit\n");
            break;
        }
        for(i=0; i< nfd; i++){
        /*使用自定义结构体myevent_s类型指针,接收 联合体data的void *ptr成员*/
            struct myevent_s *ev =(struct myevent_s*)events[i].data.ptr;  //注意强转,prt指向全局结构体g_events[i]
            //epoll_wait传出数组events[i] fd的事件要和fd的*ev结构体中的事件相同
            if((events[i].events & EPOLLIN)&&(ev->events & EPOLLIN))  //读就绪事件
                ev->call_back(ev->fd,events[i].events,ev->arg);  //调用*ptr中的回调函数
                //调用从cfd对应的myevents_s结构体g_events[i] == ev中的的回调函数,且将ev->arg自己传给ev自己的成员回调函数,让回调函数对ev进行操作 -- 自己给了成员
                //其实参4ev->arg和传events[i].data.ptr没啥区别,都是g_events[i],只不过ptr比arg更高一级(ptr->arg),而arg和call_back都是成员
                //成员变量arg传给成员回调函数call_back做参数不会显得太突兀,实际上还是将g_events[i]传给了call_back
                //其最终目的都是为了处理该cfd的读请求,然后将其从g_efd上去掉和将status更改为0,再改为写监听添加到g_efd
                //Lfd EPOLLIN
            //读完后将cfd的回调改为写回调sendata和将事件改为写事件,在下一次epoll_wait监听到将数据写给客户端
            if((events[i].events & EPOLLOUT)&&(ev->events & EPOLLOUT))  //写就绪事件
                ev->call_back(ev->fd,events[i].events,ev->arg);
        }
    }
    //退出前释放所有资源
    return 0;
}

2.3 ptr成员和g_events的关系

eventset是设置好void* ptr -- struct myevent_s中的相关内容((struct epoll_event).data.ptr),eventadd是创建struct epoll_event节点,将eventset创建的反应堆赋值到节点上

了解代码前要先搞清除挂点的data和events成员,data中的ptr和fd成员,ptr中的结构体成员 -- arg、call_back等,arg和ptr实际上是一回事

为什么传arg给回调函数而不直接传ptr呢?可以看代码251行后的注释


原文地址:https://blog.csdn.net/caiji0169/article/details/142686766

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!