自学内容网 自学内容网

《Linux高性能服务器编程》笔记04

Linux高性能服务器编程

本文是读书笔记,如有侵权,请联系删除。

参考

Linux高性能服务器编程源码: https://github.com/raichen/LinuxServerCodes

豆瓣: Linux高性能服务器编程

第09章I/O复用

I/O复用使得程序能同时监听多个文件描述符,这对提高程序的性能至关重要。通常,网络程序在下列情况下需要使用I/0复用技术:

客户端程序要同时处理多个socket。比如本章将要讨论的非阻塞connect技术。

客户端程序要同时处理用户输入和网络连接。比如本章将要讨论的聊天室程序。

TCP服务器要同时处理监听socket和连接socket。这是I/O复用使用最多的场合。后 续章节将展示很多这方面的例子。

服务器要同时处理TCP请求和UDP请求。比如本章将要讨论的回射服务器。

服务器要同时监听多个端口,或者处理多种服务。比如本章将要讨论的xinetd 服务器。

需要指出的是,I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的。并且当多个文件描述符同时就绪时,如果不采取额外的措施,程序就只能按顺序依次处理其中的每 一个文件描述符,这使得服务器程序看起来像是串行工作的。如果要实现并发,只能使用多进程或多线程等编程手段。Linux下实现I/O复用的系统调用主要有select、poll和epoll,本章将依次讨论之,然后介绍使用它们的几个实例。

9.1 select系统调用

select系统调用的用途是:在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。本节先介绍select系统调用的API,然后讨论 select判断文件描述符就绪的条件,最后给出它在处理带外数据中的实际应用。

9.1.1 select API

select系统调用的原型如下:

#include <sys/select.h>
int select( int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds,
           struct timeval* timeout );

1)nfds参数指定被监听的文件描述符的总数。它通常被设置为select 监听的所有文件描述符中的最大值加1,因为文件描述符是从0开始计数的。

2)readfds、writefds和exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合。应用程序调用select函数时,通过这3个参数传入自己感兴趣的文件描述符。select调用返回时,内核将修改它们来通知应用程序哪些文件描述符已经就绪。这3个参数是fd_set结构指针类型。fd_set结构体的定义如下:

在这里插入图片描述

由以上定义可见,fd_set结构体仅包含一个整型数组,该数组的每个元素的每一位(bit) 标记一个文件描述符。fd_set能容纳的文件描述符数量由FD_SETSIZE指定,这就限制了 select 能同时处理的文件描述符的总量。

由于位操作过于烦琐,我们应该使用下面的一系列宏来访问fd_set结构体中的位:

在这里插入图片描述

3)timeout参数用来设置select函数的超时时间。它是一个timeval结构类型的指针,采 用指针参数是因为内核将修改它以告诉应用程序select等待了多久。不过我们不能完全信任 select调用返回后的timeout值,比如调用失败时timeout值是不确定的。

timeval结构体的定 义如下:

struct timeval
{
    1ong tv_sec; /*秒数*/ 
    long tv_usec; /*微秒数*/
};

由以上定义可见,select给我们提供了一个微秒级的定时方式。如果给timeout变量的 tv_sec成员和tv_usec成员都传递0,则select将立即返回。如果给timeout传递NULL,则 select将一直阻塞,直到某个文件描述符就绪。select 成功时返回就绪(可读、可写和异常)文件描述符的总数。如果在超时时间内没 有任何文件描述符就绪,select将返回0。select失败时返回-1并设置errno。如果在select 等待期间,程序接收到信号,则select 立即返回-1,并设置 errno为EINTR。

select 函数用于在一组文件描述符上等待某个事件发生,可以等待多个文件描述符上的可读、可写或异常等事件。该函数的参数包括:

  • nfds:监视的文件描述符集合中最大的文件描述符值加1。
  • readfdswritefdsexceptfds:分别是用于监视可读、可写、异常事件的文件描述符集合。
  • timeout:是一个指向 struct timeval 结构体的指针,用于设置超时时间。如果为 NULL,表示一直等待,直到有事件发生。

函数返回时,可以通过检查文件描述符集合来确定哪些文件描述符上发生了事件。以下是一个简单的示例代码:

#include <sys/select.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

int main() {
    // 创建文件描述符集合
    fd_set read_fds;
    
    // 初始化文件描述符集合
    FD_ZERO(&read_fds);
    
    // 添加标准输入文件描述符到集合
    FD_SET(STDIN_FILENO, &read_fds);

    // 设置超时时间为5秒
    struct timeval timeout;
    timeout.tv_sec = 5;
    timeout.tv_usec = 0;

    // 调用select等待事件
    int result = select(STDIN_FILENO + 1, &read_fds, NULL, NULL, &timeout);

    if (result == -1) {
        perror("select");
        exit(EXIT_FAILURE);
    } else if (result == 0) {
        printf("No data within 5 seconds.\n");
    } else {
        if (FD_ISSET(STDIN_FILENO, &read_fds)) {
            printf("Data is available on standard input.\n");
            // 读取标准输入的数据
            char buffer[1024];
            ssize_t bytesRead = read(STDIN_FILENO, buffer, sizeof(buffer));
            if (bytesRead > 0) {
                printf("Read %zd bytes: %.*s\n", bytesRead, (int)bytesRead, buffer);
            } else {
                perror("read");
            }
        }
    }

    return 0;
}

在上述代码中,select 用于等待标准输入上的可读事件,如果在5秒内有数据可读,则会输出相应的信息。

9.1.2文件描述符就绪条件

哪些情况下文件描述符可以被认为是可读、可写或者出现异常,对于select的使用非常关键。

在网络编程中,下列情况下socket可读:

  1. socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时 我们可以无阻塞地读该socket,并且读操作返回的字节数大于0。

  2. socket通信的对方关闭连接。此时对该socket的读操作将返回0。

  3. 监听 socket 上有新的连接请求。

  4. socket 上有未处理的错误。此时我们可以使用 getsockopt 来读取和清除该错误。

下列情况下 socket 可写:

  1. socket内核发送缓存区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
  2. socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
  3. socket使用非阻塞connect连接成功或者失败(超时)之后。
  4. socket 上有未处理的错误。此时我们可以使用getsockopt 来读取和清除该错误。

网络程序中,select能处理的异常情况只有一种:socket上接收到带外数据。下面我们详细讨论之。

9.1.3处理带外数据

上一小节提到,socket上接收到普通数据和带外数据都将使select返回,但socket处于不同的就绪状态:前者处于可读状态,后者处于异常状态。代码清单9-1描述了select是如 何同时处理二者的。

9-1use_select.cpp

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>

int main(int argc, char *argv[])
{
    // 检查命令行参数
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }
    const char *ip = argv[1];
    int port = atoi(argv[2]);
    printf("ip is %s and port is %d\n", ip, port);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    // 创建监听套接字
    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    // 绑定地址信息
    ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
    assert(ret != -1);

    // 监听连接
    ret = listen(listenfd, 5);
    assert(ret != -1);

    // 接受连接
    struct sockaddr_in client_address;
    socklen_t client_addrlength = sizeof(client_address);
    int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
    if (connfd < 0)
    {
        printf("errno is: %d\n", errno);
        close(listenfd);
    }

    // 打印连接信息
    char remote_addr[INET_ADDRSTRLEN];
    printf("connected with ip: %s and port: %d\n", inet_ntop(AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN), ntohs(client_address.sin_port));

    char buf[1024];
    fd_set read_fds;
    fd_set exception_fds;

    FD_ZERO(&read_fds);
    FD_ZERO(&exception_fds);

    int nReuseAddr = 1;
    setsockopt(connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof(nReuseAddr));

    while (1)
    {
        memset(buf, '\0', sizeof(buf));
        FD_SET(connfd, &read_fds);
        FD_SET(connfd, &exception_fds);

        // 监视连接套接字上的可读和异常事件
        ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);
        printf("select one\n");
        if (ret < 0)
        {
            printf("selection failure\n");
            break;
        }

        // 处理可读事件
        if (FD_ISSET(connfd, &read_fds))
        {
            ret = recv(connfd, buf, sizeof(buf) - 1, 0);
            if (ret <= 0)
            {
                break;
            }
            printf("get %d bytes of normal data: %s\n", ret, buf);
        }
        // 处理异常事件
        else if (FD_ISSET(connfd, &exception_fds))
        {
            ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
            if (ret <= 0)
            {
                break;
            }
            printf("get %d bytes of oob data: %s\n", ret, buf);
        }
    }

    // 关闭连接套接字和监听套接字
    close(connfd);
    close(listenfd);
    return 0;
}

此代码是一个简单的TCP服务器程序,监听指定的IP地址和端口,接受客户端连接后,通过 select 函数监视连接套接字上的可读和异常事件。当有数据可读时,通过 recv 函数接收数据并打印;当有带外数据(OOB)时,同样通过 recv 函数接收并打印。

对于带外数据的解释:

带外数据(Out-of-Band Data,简称OOB数据)是在TCP通信中的一种特殊数据传输机制。在正常的数据传输过程中,数据按照先后顺序被传送,但带外数据允许在正常数据之外传送一些紧急的、优先级较高的信息。

TCP的带外数据机制提供了一种通知机制,使得通信的一方可以向对方发送一些额外的信息,而不会影响正常的数据传输顺序。带外数据通常用于紧急通知或优先级较高的控制信息。

在使用TCP时,带外数据的发送和接收通常通过 sendrecv 函数的 MSG_OOB 标志来实现。发送端通过 send 函数发送带外数据,接收端通过 recv 函数的 MSG_OOB 标志来接收带外数据。

在上述的代码示例中,通过设置套接字选项 SO_OOBINLINE,将带外数据与普通数据一起接收,然后通过 recv 函数的 MSG_OOB 标志处理带外数据。

9.2 poll系统调用

poll系统调用和select类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。poll的原型如下:

#include <pol1.h>
int poll( struct pollfd* fds, nfds_t nfds, int timeout );

1)fds参数是一个pollfd结构类型的数组,它指定所有我们感兴趣的文件描述符上发生的可读、可写和异常等事件。pollfd结构体的定义如下:

struct pollfd
{
    int fd; /*文件描述符*/
    short events;/*注册的事件*/
    short revents;  /*实际发生的事件,由内核填充*/
};

其中,fd成员指定文件描述符;events成员告诉poll监听fd上的哪些事件,它是一系列事件的按位或:revents成员则由内核修改,以通知应用程序fd上实际发生了哪些事件。poll 支持的事件类型如表9-1所示。

在这里插入图片描述

表9-1中,POLLRDNORM、POLLRDBAND、POLLWRNORM、POLLWRBAND由XOPEN规范定义。它们实际上是将POLLIN事件和POLLOUT事件分得更细致,以区别对待普通数据和优先数据。但Linux并不完全支持它们。

通常,应用程序需要根据recv调用的返回值来区分socket上接收到的是有效数据还是对方关闭连接的请求,并做相应的处理。不过,自Linux内核2.6.17开始,GNU为poll系统调 用增加了一个POLLRDHUP事件,它在socket上接收到对方关闭连接的请求之后触发。这为我们区分上述两种情况提供了一种更简单的方式。但使用POLLRDHUP事件时,我们需要在代码最开始处定义_GNU SOURCE。

2)nfds参数指定被监听事件集合fds的大小。其类型nfds t的定义如下: typedef unsigned long int nfds_t;

3)timeout参数指定poll的超时值,单位是毫秒。当timeout为-l时,poll调用将永远阻塞,直到某个事件发生;当timeout为0时,poll调用将立即返回。poll系统调用的返回值的含义与select相同。

poll 函数用于监视多个文件描述符,以查看它们是否准备好进行 I/O 操作。它提供了对 select 的更灵活替代,并可以高效处理大量文件描述符。

以下是正确的头文件和使用 poll 的示例:

#include <poll.h>
#include <stdio.h>
#include <stdlib.h>

int main() {
    struct pollfd fds[1];
    int timeout = 5000;  // 超时时间,单位为毫秒

    // 初始化要监视的文件描述符
    fds[0].fd = /* 您的文件描述符 */;
    fds[0].events = POLLIN;  // 监视可读性

    // 调用 poll
    int result = poll(fds, 1, timeout);

    if (result == -1) {
        perror("poll");
        exit(EXIT_FAILURE);
    } else if (result == 0) {
        printf("发生超时。\n");
    } else {
        if (fds[0].revents & POLLIN) {
            printf("数据准备好读取。\n");
            // 处理读取操作中的数据
        }
        // 如果需要,可以检查其他事件
    }

    return 0;
}

在这个例子中,poll 用于监视具有指定超时的单个文件描述符的可读性。代码检查文件描述符是否准备好进行读取,并可以根据需要扩展以处理其他事件。

9.3 epoll系列系统调用

9.3.1内核事件表

epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll有很大差异。首 先,epoll使用一组函数来完成任务,而不是单个函数。其次,epoll把用户关心的文件描述 符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传 入文件描述符集或事件集。但epoll需要使用一个额外的文件描述符,来唯一标识内核中的 这个事件表。这个文件描述符使用如下epoll_create函数来创建:

#include <sys/epoll.h> 
int epol1_create( int size )

size参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大。该函数返回 的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。

下面的函数用来操作epoll的内核事件表:

#include <sys/epoll.h>
int epoll_ctl( int epfd, int op, int fd, struct epoll_event *event )

fd参数是要操作的文件描述符,op参数则指定操作类型。操作类型有如下3种:

EPOLL_CTL_ADD,往事件表中注册fd上的事件。

EPOLL_CTL_MOD,修改fd上的注册事件。

EPOLL_CTL_DEL,删除fd上的注册事件。

event参数指定事件,它是epoll_event结构指针类型。

epoll_event 的定义如下:

struct epoll_event
{
    __uint32_t events; /* epoll事件*/ 
    epoll_data_t data; /*用户数据 */
};

其中events 成员描述事件类型。epoll支持的事件类型和poll基本相同。表示epoll事件 类型的宏是在poll对应的宏前加上“E”,比如epoll的数据可读事件是EPOLLIN。但epoll有 两个额外的事件类型—EPOLLET和EPOLLONESHOT。它们对于epoll的高效运作非常关 键,我们将在后面讨论它们。data成员用于存储用户数据,

其类型epoll_data_t的定义如下:

typedef union epoll_data
{
    void* ptr;
    int fd;
    uint32_t u32;
    uint64_t u64; 
} epoll_data_t;

epoll_data_t是一个联合体,其4个成员中使用最多的是fd,它指定事件所从属的目标 文件描述符。ptr成员可用来指定与fd相关的用户数据。但由于epoll_data_t是一个联合体, 我们不能同时使用其ptr成员和fd成员,因此,如果要将文件描述符和用户数据关联起来 (正如8.5.2小节讨论的将句柄和事件处理器绑定一样),以实现快速的数据访问,只能使用其他手段,比如放弃使用epoll_data_t的fd成员,而在ptr指向的用户数据中包含fd。

epoll_ctl成功时返回0,失败则返回-1并设置errmo。

9.3.2 epoll_wait 函数

epoll系列系统调用的主要接口是epoll wait 函数。它在一段超时时间内等待一组文件描 述符上的事件,其原型如下:

#include <sys/epoll.h>
int epoll_wait( int epfd, struct epoll_event* events, 
               int maxevents,int timeout );

该函数成功时返回就绪的文件描述符的个数,失败时返回-1并设置errno。关于该函数的参数,我们从后往前讨论。timeout参数的含义与poll接口的timeout参数 相同。maxevents参数指定最多监听多少个事件,它必须大于0。

epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指 定)中复制到它的第二个参数events指向的数组中。这个数组只用于输出 epoll_wait 检测到 的就绪事件,而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出 内核检测到的就绪事件。这就极大地提高了应用程序索引就绪文件描述符的效率。代码清单 9-2体现了这个差别。

在这里插入图片描述

pollepoll 都是用于实现 I/O 多路复用的机制,但它们在使用上有一些差异。主要的区别包括性能和触发方式。

1. 性能差异:

  • poll 在文件描述符较多的情况下性能较差,因为它是线性扫描的,时间复杂度为O(n)。
  • epoll 使用事件通知的方式,对于大量的文件描述符能够更高效地处理,时间复杂度为O(1)。

2. 触发方式:

  • poll 是水平触发(Level Triggered)的,当文件描述符中的数据准备好时,每次调用 poll 都会通知。
  • epoll 支持水平触发和边缘触发(Edge Triggered),可以选择性地只在状态变化时通知。

以下是简单的使用示例:

使用 poll 的示例:

#include <poll.h>
#include <stdio.h>
#include <stdlib.h>

int main() {
    struct pollfd fds[1];
    int timeout = 5000;  // 超时时间,单位为毫秒

    // 初始化要监视的文件描述符
    fds[0].fd = /* 您的文件描述符 */;
    fds[0].events = POLLIN;  // 监视可读性

    // 调用 poll
    int result = poll(fds, 1, timeout);

    if (result == -1) {
        perror("poll");
        exit(EXIT_FAILURE);
    } else if (result == 0) {
        printf("发生超时。\n");
    } else {
        if (fds[0].revents & POLLIN) {
            printf("数据准备好读取。\n");
            // 处理读取操作中的数据
        }
        // 如果需要,可以检查其他事件
    }

    return 0;
}

使用 epoll 的示例:

#include <sys/epoll.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

int main() {
    int epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = /* 您的文件描述符 */;

    // 将文件描述符添加到 epoll 监听
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, /* 您的文件描述符 */, &event) == -1) {
        perror("epoll_ctl");
        close(epoll_fd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[1];
    int timeout = 5000;  // 超时时间,单位为毫秒

    // 调用 epoll_wait
    int num_events = epoll_wait(epoll_fd, events, 1, timeout);

    if (num_events == -1) {
        perror("epoll_wait");
        close(epoll_fd);
        exit(EXIT_FAILURE);
    } else if (num_events == 0) {
        printf("发生超时。\n");
    } else {
        if (events[0].events & EPOLLIN) {
            printf("数据准备好读取。\n");
            // 处理读取操作中的数据
        }
        // 如果需要,可以检查其他事件
    }

    close(epoll_fd);
    return 0;
}

上述示例仅演示基本用法,实际使用中需要根据具体场景和需求进行适当的修改。

9.3.3LT和ET模式

epoll对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和ET(Edge Trigger,边沿触发)模式。LT模式是默认的工作模式,这种模式下epoll相当于一个效率较 高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以 ET模式来操作该文件描述符。ET模式是epoll的高效工作模式。

对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此 事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用 epoll_wait时,cpoll_wait还会再次向应用程序通告此事件,直到该事件被处理。而对于 采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应 用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait 调用将不再向应用程序 通知这一事件。可见,ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高。代码清单9-3体现了LT和ET在工作方式上的差异。

9-3mtlt.cpp

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10
// 设置文件描述符为非阻塞模式
int setnonblocking(int fd) {
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

// 添加文件描述符到 epoll 监听
void addfd(int epollfd, int fd, bool enable_et) {
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if (enable_et) {
        event.events |= EPOLLET;  // 边缘触发模式
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

// 使用水平触发模式处理事件
void lt(epoll_event* events, int number, int epollfd, int listenfd) {
    char buf[BUFFER_SIZE];
    for (int i = 0; i < number; i++) {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            // 处理新的连接请求
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
            addfd(epollfd, connfd, false);  // 使用水平触发模式
        } else if (events[i].events & EPOLLIN) {
            // 有数据可读
            printf("event trigger once\n");
            memset(buf, '\0', BUFFER_SIZE);
            int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
            if (ret <= 0) {
                close(sockfd);
                continue;
            }
            printf("get %d bytes of content: %s\n", ret, buf);
        } else {
            printf("something else happened \n");
        }
    }
}

// 使用边缘触发模式处理事件
void et(epoll_event* events, int number, int epollfd, int listenfd) {
    char buf[BUFFER_SIZE];
    for (int i = 0; i < number; i++) {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            // 处理新的连接请求
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
            addfd(epollfd, connfd, true);  // 使用边缘触发模式
        } else if (events[i].events & EPOLLIN) {
            // 有数据可读
            printf("event trigger once\n");
            while (1) {
                memset(buf, '\0', BUFFER_SIZE);
                int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
                if (ret < 0) {
                    if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
                        printf("read later\n");
                        break;
                    }
                    close(sockfd);
                    break;
                } else if (ret == 0) {
                    close(sockfd);
                } else {
                    printf("get %d bytes of content: %s\n", ret, buf);
                }
            }
        } else {
            printf("something else happened \n");
        }
    }
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, true );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }
    
        lt( events, ret, epollfd, listenfd );
        //et( events, ret, epollfd, listenfd );
    }

    close( listenfd );
    return 0;
}

9.3.4 EPOLLONESHOT事件

即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序 中就会引起一个问题。比如一个线程(或进程,下同)在读取完某个socket上的数据后开 始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触 发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个 socket的局面。这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理。这一点可以使用epoll的EPOLLONESHOT事件实现。

对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注 册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有 机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个 线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这 个 socket。

代码清单9-4展示了EPOLLONESHOT事件的使用。

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024

struct fds {
    int epollfd;
    int sockfd;
};

// 设置文件描述符为非阻塞模式
int setnonblocking(int fd) {
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

// 添加文件描述符到 epoll 监听
void addfd(int epollfd, int fd, bool oneshot) {
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;  // 默认使用边缘触发模式
    if (oneshot) {
        event.events |= EPOLLONESHOT;  // 使用 EPOLLONESHOT 保证一个 socket 连接在任意时刻只被一个线程处理
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

// 重置文件描述符上的 EPOLLONESHOT 事件
void reset_oneshot(int epollfd, int fd) {
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}

// 工作线程,负责处理某个 socket 上的数据
void* worker(void* arg) {
    // 获取传递给线程的参数,包含要处理的 socket 文件描述符和 epoll 文件描述符
    int sockfd = ((fds*)arg)->sockfd;
    int epollfd = ((fds*)arg)->epollfd;
    printf("start new thread to receive data on fd: %d\n", sockfd);
    char buf[BUFFER_SIZE];
    memset(buf, '\0', BUFFER_SIZE);

    // 循环读取数据
    while (1) {
        // 使用 recv 从 sockfd 接收数据
        int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
        if (ret == 0) {
            // 连接关闭,关闭 sockfd 并打印信息
            close(sockfd);
            printf("foreigner closed the connection\n");
            break;
        } else if (ret < 0) {
            // 非阻塞模式下,如果没有数据可读,errno 为 EAGAIN,重新设置 EPOLLONESHOT 事件并退出循环
            if (errno == EAGAIN) {
                reset_oneshot(epollfd, sockfd);
                printf("read later\n");
                break;
            }
        } else {
            // 成功接收到数据,打印数据内容并休眠 5 秒
            printf("get content: %s\n", buf);
            sleep(5);
        }
    }
    printf("end thread receiving data on fd: %d\n", sockfd);
}

int main(int argc, char* argv[]) {
    if (argc <= 2) {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    // 创建监听 socket
    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    // 绑定地址和端口,监听连接
    ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    assert(epollfd != -1);

    // 添加监听 socket 到 epoll 中
    addfd(epollfd, listenfd, false);

    // 循环等待事件
    while (1) {
        int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if (ret < 0) {
            printf("epoll failure\n");
            break;
        }

        // 处理每个事件
        for (int i = 0; i < ret; i++) {
            int sockfd = events[i].data.fd;
            if (sockfd == listenfd) {
                // 处理新的连接请求,添加新的连接到 epoll 中
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
                addfd(epollfd, connfd, true);
            } else if (events[i].events & EPOLLIN) {
                // 有数据可读,创建一个工作线程来处理
                pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                pthread_create(&thread, NULL, worker, (void*)&fds_for_new_worker);
            } else {
                printf("something else happened \n");
            }
        }
    }

    // 关闭监听 socket
    close(listenfd);
    return 0;
}

从工作线程函数worker来看,如果一个工作线程处理完某个socket上的一次请求(我们用休眠5s来模拟这个过程)之后,又接收到该socket上新的客户请求,则该线程将继续 为这个socket服务。并且因为该socket上注册了EPOLLONESHOT事件,其他线程没有机会接触这个socket,如果工作线程等待5s后仍然没收到该socket 上的下一批客户数据,则 它将放弃为该socket服务。同时,它调用reset_oneshot 函数来重置该socket上的注册事件, 这将使epoll有机会再次检测到该socket 上的EPOLLIN事件,进而使得其他线程有机会为该socket服务。

由此看来,尽管一个socket在不同时间可能被不同的线程处理,但同一时刻肯定只有一个线程在为它服务。这就保证了连接的完整性,从而避免了很多可能的竞态条件。

9.4 三组I/O复用函数的比较

前面我们讨论了select、poll和epoll三组I/O复用系统调用,这3组系统调用都能同时监听多个文件描述符。它们将等I待由timeout参数指定的超时时间,直到一个或者多个文件描述符上有事件发生时返回,返回值是就绪的文件描述符的数量。返回0表示没有事件发生。现在我们从事件集、最大支持文件描述符数、工作模式和具体实现等四个方面进一步比较它们的异同,以明确在实际应用中应该选择使用哪个(或哪些)。

这3组函数都通过某种结构体变量来告诉内核监听哪些文件描述符上的哪些事件,并使用该结构体类型的参数来获取内核处理的结果。

select的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符集合,因此select需要提供3个这种类型的参数来分别传入和输出可读、可写及异常等事件。这一方面使得select不能处理更多类型的事件,另 一方面由于内核对fd_set集合的在线修改,应用程序下次调用select 前不得不重置这3个fd set集合。

poll的参数类型pollfd则多少“聪明”一些。它把文件描述符和事件都定义其中, 任何事件都被统一处理,从而使得编程接口简洁得多。并且内核每次修改的是pollfd结构体 的revents成员,而events成员保持不变,因此下次调用poll 时应用程序无须重置 pollfd类型的事件集参数。由于每次select和poll调用都返回整个用户注册的事件集合(其中包括就 绪的和未就绪的),所以应用程序索引就绪文件描述符的时间复杂度为O(n)。

epoll则采用 与select和poll完全不同的方式来管理用户注册的事件。它在内核中维护一个事件表,并提供了一个独立的系统调用epoll_ctl来控制往其中添加、删除、修改事件。这样,每次epoll wait调用都直接从该内核事件表中取得用户注册的事件,而无须反复从用户空间读入这些事 件。epoll_wait系统调用的events参数仅用来返回就绪的事件,这使得应用程序索引就绪文 件描述符的时间复杂度达到O(1)。

poll和epoll_wait 分别用nfds和maxevents参数指定最多监听多少个文件描述符和事件。 这两个数值都能达到系统允许打开的最大文件描述符数目,即65535(cat/proc/sys/fs/file- max)。而select允许监听的最大文件描述符数量通常有限制。虽然用户可以修改这个限制, 但这可能导致不可预期的后果。

select和poll都只能工作在相对低效的LT模式,而epoll则可以工作在ET高效模式。 并且epoll还支持EPOLLONESHOT事件。该事件能进一步减少可读、可写和异常等事件被 触发的次数。

从实现原理上来说,select和poll采用的都是轮询的方式,即每次调用都要扫描整个注册文件描述符集合,并将其中就绪的文件描述符返回给用户程序,因此它们检测就绪事件的算法的时间复杂度是O(n)。epoll_wait则不同,它采用的是回调的方式。内核检测到就绪 的文件描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪 事件队列。内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。因此epoll wait无须轮询整个文件描述符集合来检测哪些事件已经就绪,其算法时间复杂度是O(1)。 但是,当活动连接比较多的时候,epoll_wait 的效率未必比 select和poll高,因为此时回调函 数被触发得过于频繁。所以epoll_wait 适用于连接数量多,但活动连接较少的情况。

最后,为了便于阅读,我们将这3组I/O复用系统调用的区别总结于表9-2中。

在这里插入图片描述

9.5 I/O复用的高级应用一:非阻塞connect

这段话描述了connect 出错时的一种errno值:EINPROGRESS。这种错误发生在对非阻塞的socket调用connect,而连接又没有立即建立时。在这种情况下, 我们可以调用select、poll等函数来监听这个连接失败的socket上的可写事件。当select、 poll等函数返回后,再利用getsockopt来读取错误码并清除该socket上的错误。如果错误码是0,表示连接成功建立,否则连接失败。通过上面描述的非阻塞connect方式,我们就能同时发起多个连接并一起等待。下面看看非阻塞connect的一种实现,如代码清单9-5所示。

9-5unblockconnect.cpp

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <time.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <string.h>

#define BUFFER_SIZE 1023

// 设置套接字为非阻塞模式
int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

// 非阻塞 connect 函数
int unblock_connect(const char* ip, int port, int time)
{
    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    int sockfd = socket(PF_INET, SOCK_STREAM, 0);
    int fdopt = setnonblocking(sockfd);
    
    // 尝试发起连接
    ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address));
    
    if (ret == 0)
    {
        // 连接成功
        printf("Connect with server immediately\n");
        fcntl(sockfd, F_SETFL, fdopt);
        return sockfd;
    }
    else if (errno != EINPROGRESS)
    {
        // 非阻塞连接不被支持
        printf("Unblock connect not supported\n");
        return -1;
    }

    fd_set readfds;
    fd_set writefds;
    struct timeval timeout;

    FD_ZERO(&readfds);
    FD_SET(sockfd, &writefds);

    timeout.tv_sec = time;
    timeout.tv_usec = 0;

    // 使用 select 等待连接建立
    ret = select(sockfd + 1, NULL, &writefds, NULL, &timeout);
    if (ret <= 0)
    {
        // 连接超时
        printf("Connection time out\n");
        close(sockfd);
        return -1;
    }

    if (!FD_ISSET(sockfd, &writefds))
    {
        // 未在 sockfd 上发现事件
        printf("No events on sockfd found\n");
        close(sockfd);
        return -1;
    }

    int error = 0;
    socklen_t length = sizeof(error);

    // 检查连接是否成功
    if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &length) < 0)
    {
        printf("Get socket option failed\n");
        close(sockfd);
        return -1;
    }

    if (error != 0)
    {
        // 连接失败
        printf("Connection failed after select with the error: %d \n", error);
        close(sockfd);
        return -1;
    }

    // 连接准备就绪
    printf("Connection ready after select with the socket: %d \n", sockfd);
    fcntl(sockfd, F_SETFL, fdopt);
    return sockfd;
}

int main(int argc, char* argv[])
{
    if (argc <= 2)
    {
        printf("Usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi(argv[2]);

    int sockfd = unblock_connect(ip, port, 10);
    if (sockfd < 0)
    {
        return 1;
    }
    
    // 关闭写端,模拟发送数据
    shutdown(sockfd, SHUT_WR);
    sleep(200);
    printf("Send data out\n");
    send(sockfd, "abc", 3, 0);
    //sleep(600);
    return 0;
}

注释:

  1. setnonblocking: 设置套接字为非阻塞模式的函数,返回旧的套接字选项。
  2. unblock_connect: 实现非阻塞 connect 的函数,尝试发起连接,如果连接成功,返回连接的套接字;如果连接正在进行中,使用 select 等待连接建立。
  3. main: 主函数,接收命令行参数,调用 unblock_connect 尝试进行非阻塞连接,之后关闭写端并模拟发送数据。

但遗憾的是,这种方法存在几处移植性问题。首先,非阻塞的socket 可能导致connect 始终失败。其次,select对处于EINPROGRESS状态下的socket可能不起作用。最后,对于 出错的socket,getsockopt在有些系统(比如Linux)上返回-1, 而在有些系统(比如源自伯克利的UNIX)上则返回0。这些问题没有一个统一的解决方法。

9.6 I/O复用的高级应用二:聊天室程序

像ssh这样的登录服务通常要同时处理网络连接和用户输入,这也可以使用I/O复用来实现。

本节我们以poll为例实现一个简单的聊天室程序,以阐述如何使用I/O复用技术来同时处理网络连接和用户输入。该聊天室程序能让所有用户同时在线群聊,它分为客户端和服务器两个部分。其中客户端程序有两个功能:一是从标准输入终端读入用户数据,并将用户数据发送至服务器;二是往标准输出终端打印服务器发送给它的数据。服务器的功能是接收 客户数据,并把客户数据发送给每一个登录到该服务器上的客户端(数据发送者除外)。下 面我们依次给出客户端程序和服务器程序的代码。

9.6.1客户端

客户端程序使用poll同时监听用户输入和网络连接,并利用splice函数将用户输入内容 直接定向到网络连接上以发送之,从而实现数据零拷贝,提高了程序执行效率。客户端程序代码如下。

9-6mytalk_client.cpp

#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <fcntl.h>

#define BUFFER_SIZE 64

int main(int argc, char* argv[])
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }
    
    const char* ip = argv[1];
    int port = atoi(argv[2]);

    // 创建服务器地址结构体
    struct sockaddr_in server_address;
    bzero(&server_address, sizeof(server_address));
    server_address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &server_address.sin_addr);
    server_address.sin_port = htons(port);

    // 创建客户端套接字
    int sockfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(sockfd >= 0);

    // 尝试连接服务器
    if (connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0)
    {
        printf("Connection failed\n");
        close(sockfd);
        return 1;
    }

    // 创建 pollfd 数组,用于监视标准输入和 sockfd 上的事件
    pollfd fds[2];
    fds[0].fd = 0;          // 标准输入
    fds[0].events = POLLIN; // 监视读事件
    fds[0].revents = 0;
    fds[1].fd = sockfd;      // 客户端套接字
    fds[1].events = POLLIN | POLLRDHUP; // 监视读事件和对端连接断开事件
    fds[1].revents = 0;

    char read_buf[BUFFER_SIZE];
    int pipefd[2];
    
    // 创建管道
    int ret = pipe(pipefd);
    assert(ret != -1);

    while (1)
    {
        // 使用 poll 等待事件发生
        ret = poll(fds, 2, -1);
        if (ret < 0)
        {
            printf("poll failure\n");
            break;
        }

        // 处理服务器断开连接事件
        if (fds[1].revents & POLLRDHUP)
        {
            printf("Server close the connection\n");
            break;
        }
        // 处理服务器发送的数据事件
        else if (fds[1].revents & POLLIN)
        {
            memset(read_buf, '\0', BUFFER_SIZE);
            recv(fds[1].fd, read_buf, BUFFER_SIZE - 1, 0);
            printf("%s\n", read_buf);
        }

        // 处理标准输入事件
        if (fds[0].revents & POLLIN)
        {
            // 从标准输入读取数据,并通过管道发送给服务器
            ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
            ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
        }
    }

    // 关闭客户端套接字
    close(sockfd);
    return 0;
}

注释:

  1. server_address: 服务器地址结构体,存储要连接的服务器的 IP 地址和端口号。
  2. sockfd: 客户端套接字,用于与服务器建立连接和进行通信。
  3. fds[2]: pollfd 数组,用于监视标准输入和客户端套接字上的事件。
  4. pipefd[2]: 管道,用于在标准输入和客户端套接字之间传输数据。
  5. poll: 使用 poll 函数等待事件发生,处理标准输入、客户端套接字的读事件和对端连接断开事件。
  6. splice: 使用 splice 函数将标准输入的数据发送给服务器,并将服务器发送的数据写入标准输出。

9.6.2服务器

服务器程序使用poll同时管理监听 socket和连接socket,并且使用牺牲空间换取时间的 策略来提高服务器性能,如代码清单9-7所示。

9-7mytalk_server.cpp

#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <poll.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 64
#define FD_LIMIT 65535

struct client_data
{
    sockaddr_in address;  // 存储客户端地址信息
    char* write_buf;      // 写缓冲区
    char buf[BUFFER_SIZE]; // 读缓冲区
};

int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

int main(int argc, char* argv[])
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }

    const char* ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    // 初始化客户端数据数组和 pollfd 数组
    client_data* users = new client_data[FD_LIMIT];
    pollfd fds[USER_LIMIT + 1];
    int user_counter = 0;
    
    // 初始化 pollfd 数组,将监听套接字加入数组
    for (int i = 1; i <= USER_LIMIT; ++i)
    {
        fds[i].fd = -1;
        fds[i].events = 0;
    }
    fds[0].fd = listenfd;
    fds[0].events = POLLIN | POLLERR;
    fds[0].revents = 0;

    while (1)
    {
        // 使用 poll 等待事件发生
        ret = poll(fds, user_counter + 1, -1);
        if (ret < 0)
        {
            printf("poll failure\n");
            break;
        }

        // 处理监听套接字上的事件
        if ((fds[0].fd == listenfd) && (fds[0].revents & POLLIN))
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);

            if (connfd < 0)
            {
                printf("errno is: %d\n", errno);
                continue;
            }

            // 处理连接数超过限制的情况
            if (user_counter >= USER_LIMIT)
            {
                const char* info = "too many users\n";
                printf("%s", info);
                send(connfd, info, strlen(info), 0);
                close(connfd);
                continue;
            }

            // 客户端连接数加1,设置连接套接字为非阻塞
            user_counter++;
            users[connfd].address = client_address;
            setnonblocking(connfd);
            fds[user_counter].fd = connfd;
            fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;
            fds[user_counter].revents = 0;
            printf("comes a new user, now have %d users\n", user_counter);
        }
        else
        {
            // 处理其他连接上的事件
            for (int i = 1; i <= user_counter; ++i)
            {
                if (fds[i].revents & POLLERR)
                {
                    printf("get an error from %d\n", fds[i].fd);
                    char errors[100];
                    memset(errors, '\0', 100);
                    socklen_t length = sizeof(errors);

                    // 获取套接字错误信息
                    if (getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0)
                    {
                        printf("get socket option failed\n");
                    }
                    continue;
                }
                else if (fds[i].revents & POLLRDHUP)
                {
                    // 客户端断开连接,处理相关信息
                    users[fds[i].fd] = users[fds[user_counter].fd];
                    close(fds[i].fd);
                    fds[i] = fds[user_counter];
                    i--;
                    user_counter--;
                    printf("a client left\n");
                }
                else if (fds[i].revents & POLLIN)
                {
                    // 读取客户端数据
                    int connfd = fds[i].fd;
                    memset(users[connfd].buf, '\0', BUFFER_SIZE);
                    ret = recv(connfd, users[connfd].buf, BUFFER_SIZE - 1, 0);
                    printf("get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd);

                    if (ret < 0)
                    {
                        if (errno != EAGAIN)
                        {
                            close(connfd);
                            users[fds[i].fd] = users[fds[user_counter].fd];
                            fds[i] = fds[user_counter];
                            i--;
                            user_counter--;
                        }
                    }
                    else if (ret == 0)
                    {
                        printf("code should not come to here\n");
                    }
                    else
                    {
                        // 将客户端数据发送给其他连接的客户端
                        for (int j = 1; j <= user_counter; ++j)
                        {
                            if (fds[j].fd == connfd)
                            {
                                continue;
                            }

                            fds[j].events |= ~POLLIN;
                            fds[j].events |= POLLOUT;
                            users[fds[j].fd].write_buf = users[connfd].buf;
                        }
                    }
                }
                else if (fds[i].revents & POLLOUT)
                {
                    // 发送数据给客户端
                    int connfd = fds[i].fd;
                    if (!users[connfd].write_buf)
                    {
                        continue;
                    }
                    ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0);
                    users[connfd].write_buf = NULL;
                    fds[i].events |= ~POLLOUT;
                    fds[i].events |= POLLIN;
                }
            }
        }
    }

    delete[] users;
    close(listenfd);
    return 0;
}

工作流程:

  1. 创建监听套接字 listenfd,绑定地址并开始监听。
  2. 初始化客户端数据结构体数组 userspollfd 数组 fds
  3. fds 数组中将监听套接字 listenfd 加入,设置监听事件为 POLLINPOLLERR
  4. 进入主循环,使用 poll 等待事件发生。
  5. 处理监听套接字上的事件,如果有新的客户端连接,则接受连接,设置为非阻塞,并加入 fds 数组中。
  6. 处理其他连接上的事件,包括错误、对端连接断开、有数据可读、有数据可写。
  7. 对于数据可读的连接,将数据读取并发送给其他连接的客户端。
  8. 对于数据可写的连接,将数据发送给客户端。
  9. 循环处理事件,直到发生错误或中断。

该程序使用 poll 实现了一个简单的多客户端聊天室服务器,通过非阻塞套接字和事件驱动的方式处理多个连接,支持同时连接多个客户端。

9.7 I/O复用的高级应用三:同时处理TCP和UDP服务

至此,我们讨论过的服务器程序都只监听一个端口。在实际应用中,有不少服务器程序能同时监听多个端口,比如超级服务inetd和android的调试服务adbd。从bind系统调用的参数来看,一个socket只能与一个socket地址绑定,即一个socket 只能用来监听一个端口。因此,服务器如果要同时监听多个端口,就必须创建多个socket, 并将它们分别绑定到各个端口上。这样一来,服务器程序就需要同时管理多个监听socket, I/O复用技术就有了用武之地。另外,即使是同一个端口,如果服务器要同时处理该端口上 的TCP和UDP请求,则也需要创建两个不同的socket:一个是流socket,另一个是数据报socket,并将它们都绑定到该端口上。

比如代码清单9-8所示的回射服务器就能同时处理一 个端口上的TCP和UDP请求。

9-8multi_port.cpp

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define TCP_BUFFER_SIZE 512
#define UDP_BUFFER_SIZE 1024

// 设置套接字为非阻塞
int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

// 将文件描述符加入 epoll 事件监听集合
void addfd(int epollfd, int fd)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

int main(int argc, char* argv[])
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }

    const char* ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;

    // 初始化 TCP 套接字
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);
    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    // 初始化 UDP 套接字
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);
    int udpfd = socket(PF_INET, SOCK_DGRAM, 0);
    assert(udpfd >= 0);

    ret = bind(udpfd, (struct sockaddr*)&address, sizeof(address));
    assert(ret != -1);

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    assert(epollfd != -1);

    // 将 TCP 和 UDP 套接字加入 epoll 事件监听集合
    addfd(epollfd, listenfd);
    addfd(epollfd, udpfd);

    while (1)
    {
        int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if (number < 0)
        {
            printf("epoll failure\n");
            break;
        }

        for (int i = 0; i < number; i++)
        {
            int sockfd = events[i].data.fd;

            // 处理新的 TCP 连接
            if (sockfd == listenfd)
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
                addfd(epollfd, connfd);
            }
            // 处理 UDP 数据
            else if (sockfd == udpfd)
            {
                char buf[UDP_BUFFER_SIZE];
                memset(buf, '\0', UDP_BUFFER_SIZE);
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);

                ret = recvfrom(udpfd, buf, UDP_BUFFER_SIZE - 1, 0, (struct sockaddr*)&client_address, &client_addrlength);
                if (ret > 0)
                {
                    sendto(udpfd, buf, UDP_BUFFER_SIZE - 1, 0, (struct sockaddr*)&client_address, client_addrlength);
                }
            }
            // 处理 TCP 数据
            else if (events[i].events & EPOLLIN)
            {
                char buf[TCP_BUFFER_SIZE];
                while (1)
                {
                    memset(buf, '\0', TCP_BUFFER_SIZE);
                    ret = recv(sockfd, buf, TCP_BUFFER_SIZE - 1, 0);
                    if (ret < 0)
                    {
                        if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                        {
                            break;
                        }
                        close(sockfd);
                        break;
                    }
                    else if (ret == 0)
                    {
                        close(sockfd);
                    }
                    else
                    {
                        send(sockfd, buf, ret, 0);
                    }
                }
            }
            else
            {
                printf("something else happened \n");
            }
        }
    }

    close(listenfd);
    return 0;
}

代码解释:

  1. setnonblocking: 将套接字设置为非阻塞模式。
  2. addfd: 将套接字加入 epoll 事件监听集合。
  3. 创建 TCP 套接字 listenfd,绑定并监听。
  4. 创建 UDP 套接字 udpfd,绑定。
  5. 创建 epoll 实例 epollfd,将 TCP 和 UDP 套接字加入监听集合。
  6. 进入主循环,调用 epoll_wait 等待事件发生。
  7. 处理监听套接字上的事件,有新的 TCP 连接则接受连接并加入监听。
  8. 处理 UDP 套接字上的事件,接收数据并发送给客户端。
  9. 处理其他连接上的事件,包括接收 TCP 数据和发送数据。
  10. 主循环中断后关闭套接字。

9.8超级服务xinetd

Linux因特网服务inetd是超级服务。它同时管理着多个子服务,即监听多个端口。现在 Linux系统上使用的inetd服务程序通常是其升级版本xinetd。xinetd程序的原理与inetd相同,但增加了一些控制选项,并提高了安全性。

xinetd(Extended Internet Services Daemon)是一个在 Unix 系统上运行的守护进程,用于管理网络服务的启动和停止。它是 inetd(Internet Services Daemon)的增强版本,提供了更多的功能和配置选项。以下是关于 xinetd 的详细介绍:

特点和功能:

  1. 服务管理: xinetd 主要负责启动和停止网络服务。它监听指定的端口,并根据客户端请求启动相应的服务。服务可以是任何可执行程序。

  2. 资源节省: 与常规的独立守护进程相比,xinetd 以超级服务器的形式运行。这意味着它仅在有连接请求时启动相应的服务,而不是一直运行。这有助于节省系统资源。

  3. 并发连接控制: xinetd 允许管理员限制同时连接到服务的客户端数量。这对于控制服务器资源的使用非常有用。

  4. Access Control: 提供了灵活的访问控制机制,可以根据 IP 地址、网络掩码、域名等对服务的访问进行限制。

  5. 日志记录: xinetd 支持详细的日志记录,可帮助管理员跟踪连接和服务的使用情况。

  6. 超时设置: 管理连接的超时设置,当连接在一定时间内没有活动时,xinetd 可以选择关闭连接。

  7. 重试设置: 允许管理员配置连接失败后的重试次数。

  8. 启动参数: 可以向服务传递额外的启动参数。

配置文件:

xinetd 的配置文件通常位于 /etc/xinetd.conf/etc/xinetd.d/ 目录下,文件名通常是服务的名字。配置文件采用简单的键值对格式。

示例配置文件(/etc/xinetd.d/telnet):

service telnet
{
    disable         = no
    socket_type     = stream
    wait            = no
    user            = root
    server          = /usr/sbin/in.telnetd
    log_on_failure  += USERID
}

常用命令:

  1. 启动 xinetd

    sudo service xinetd start
    
  2. 停止 xinetd

    sudo service xinetd stop
    
  3. 重启 xinetd

    sudo service xinetd restart
    

使用场景:

  • 简化服务管理: 对于系统管理员而言,xinetd 提供了一种简便的方式来启动和管理网络服务,而无需手动管理多个独立的守护进程。

  • 资源节省: 在资源有限的系统上,xinetd 可以帮助节省系统资源,因为它只在需要时启动服务。

  • 访问控制: xinetd 提供了强大的访问控制机制,允许管理员细粒度地控制服务的访问。

  • 监控和日志: 通过日志记录功能,xinetd 可以用于监控服务的使用情况,有助于及时发现和解决问题。

总体而言,xinetd 是一个灵活而功能强大的超级服务器,适用于需要管理多个网络服务的环境。

工作流程如下

在这里插入图片描述

后记

截至2024年1月20日20点32分,完成第九章的学习,了解了select,poll和epoll几个系统调用,对I/O复用的相关知识有了一定程度的了解。


原文地址:https://blog.csdn.net/shizheng_Li/article/details/135721412

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!