自学内容网 自学内容网

多路转接之select

一、认识select

系统提供select系统调用来实现多路转接(IO复用),select只负责IO中的等待,监视文件描述符的状态变化,哪一个文件描述符的资源就绪了,就通过read/write系统调用对就绪文件符做数据拷贝工作。

#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int  FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);

第一个选项nfds表示select监视的多个文件描述符中最大文件描述符+1。这个选项缩小了select监视文件符的范围。

 struct timeval {
time_t      tv_sec;     /* seconds */
suseconds_t tv_usec;    /* microseconds */
};

最后一个选项timeout表示timeout->tv_sec时间内没有文件描述符的资源准备就绪,就一直阻塞等待,直到等待时间超过timeout->tv_sec才返回。
在timeout->tv_sec时间内有文件描述符资源准备就绪了,select就返回,timeout是一个输入输出型参数,返回后timeout中就是阻塞剩余的时间。
timeout=nullptr表示阻塞等待,有文件描述符的事件就绪才返回结果。
timeout={0,0}表示非阻塞等待,select立刻返回结果,。
timeout={5,0}在5s内没有文件描述符状态发生改变,阻塞等待5s后select返回结果,假设3s时有文件描述符状态改变,3s时select就返回结果,此时timeout={2,0}。
select返回值ret:
当ret>0时表示有ret个文件描述符的状态改变,ret个文件描述符的资源准备就绪。
ret=0时表示select超时返回,没有文件描述符的资源准备就绪。
ret<0时表示select调用失败。
文件描述符的状态改变 等价于 文件描述符的资源准备就绪 等价于 文件描述符的读写异常事件就绪

第二个第三个第四个选项readfds、writefds、exceptfds都是输入输出型参数,fd_set是位图结构。
以readfds为例:
输入的readfds表示用户告诉操作系统关心位图中为1的文件描述符的读事件的状态。
例如输入的readfds=01001101表示操作系统应该关心fd=0,fd=2,fd=3,fd=6的文件描述符的读事件的状态。
输出的readfds表示操作系统告诉用户位图为1 的文件描述符的读事件已经就绪。
例如输出的readfds=00001001表示操作系统告诉用户fd=0,fd=3的文件描述符的都事件已经就绪。
同理writefds,exceptfds也是一样。

void FD_CLR(int fd, fd_set *set);
int  FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

通过这四个函数对文件描述符位图进行操作。
FD_CLR将一个文件描述符从位图中清除掉
FD_ISSET判断文件描述符是否在位图中
FD_SET将文件描述符加入位图中
FD_ZERO将位图全部清零

二、select的缺点

1.由于fd_set的大小有限,导致select能够监视的文件描述符有限。
2.select必须借助数组来维护文件描述符。
3.select的大部分参数是输入出型参数,每次调用完都需要重新更新所有的位图,且每次都需要遍历数组找到就绪的文件描述符。
ulimit -a查看进程的文件描述符的个数

三、基于select实现Server

实现一个基于select的IO复用服务器。
listen套接字可以交给select监视,listen套接字有新连接,也就是连接就绪事件,客户端申请连接会向服务器发送消息,相当与读事件就绪。
accept需要等待有连接,再获取连接。所以将listen套接字加入读事件位图,当有连接时select再返回。
需要维护一个文件描述符数组。因为需要知道有哪些文件描述符被使用,哪些文件描述符没有被使用。这个文件描述符数组最大可以存储多少个文件描述符呢? fd_set是位图结构,能储存多少个二进制,就代表能多少个文件描述符的事件可以存储。所以数组的大小就等于位图能代表的文件描述符的数量的大小,sizeof(fd_set)=128byte=1024bit,故数组大小为1024。
获取连接套接字,将套接字文件描述符添加进文件描述符数组中,在数组中会将套接字文件描述符的加入位图中,让select监视套接字文件描述符的事件是否就绪。

main.cc

#include "selectServer.hpp"
#include <memory>
using namespace std;
using namespace select_ns;
std::string handle(const std::string& request){
  return request;
}
static void usage(string proc){
  cerr<<proc<<" need a port!!!!"<<endl;
}
int main(int argc,char* argv[]){
  if(argc!=2){
    usage(argv[0]);
    exit(1);
  }
  uint16_t port=atoi(argv[1]);
  //实现一个处理文件描述符读事件的服务器
  unique_ptr<selectServer> svr(new selectServer(handle,port));
  svr->init();
  svr->start();
}

selectServer.hpp

#pragma once
#include <iostream>
#include "sock.hpp"
#include "public.hpp"
#include <errno.h>
#include <cstring>
#include <sys/select.h>
#include <functional>
using namespace std;
namespace select_ns{
  class selectServer{
  private:
    static const uint16_t defaultPort=8080;
    static const int fdnum=1024;
    static const int defaultfd=-1;
    using func_t=std::function<std::string(const std::string)>;
  public:
    //打印有效文件描述符
    void print(){
      cout<<"socket list:"<<endl;
      for(int i=0;i<fdnum;++i){
        if(_fdarray[i]!=defaultfd)cout<<_fdarray[i]<<" ";
      }
      cout<<endl;
    }
    selectServer(func_t func,uint16_t port=defaultPort)
    :_func(func),_port(port),_listenSockfd(-1),_fdarray(nullptr){}
    void init(){
      _listenSockfd=Sock::Socket();
      Sock::Bind(_listenSockfd,_port);
      Sock::Listen(_listenSockfd);
      _fdarray=new int[fdnum];
      for(int i=0;i<fdnum;++i)_fdarray[i]=defaultfd;
      _fdarray[0]=_listenSockfd;
    }
    //监听套接字的读事件就绪
    void accepter(){
      //listenSockfd监听套接字的读事件就绪,也就是监听套接字下有客户端申请新连接
      string clientIp;
      uint16_t clientPort;
      int sockfd=Sock::Accept(_listenSockfd,&clientIp,&clientPort);
      if(sockfd==-1)return;
      log(NORMAL,"accept a new link from %s-%d,sockfd=%d",clientIp.c_str(),clientPort,sockfd);
      int i=0;
      for(;i<fdnum;++i){
        if(_fdarray[i]==defaultfd)break;
      }
      if(i==fdnum){
        log(WARNING,"fdarray is full!!! need close fd");
        close(sockfd);
      }
      else{
        _fdarray[i]=sockfd;
      }
      print();
    }
    //连接套接字的读事件就绪
    void recver(int sockfd,int pos){
      char buffer[1024];
      //不能保证读完一个完整的应用层报文
      ssize_t n=recv(sockfd,buffer,sizeof(buffer)-1,0);
      if(n>0){
        buffer[n]=0;
        std::cout<<"read a meassge:"<<buffer<<std::endl;
      }
      //连接被关闭
      else if(n==0){
        close(sockfd);
        _fdarray[pos]=defaultfd;
        std::cout<<"client close sockfd!!!"<<std::endl;
      }
      else{
        close(sockfd);
        _fdarray[pos]=defaultfd;
        std::cerr<<"recv error"<<std::endl;
      }
      //buffer反序列化获得request
      //func处理request,获取response
      //response反序列化,获得返回信息
      std::string response= _func(buffer);
      write(sockfd,response.c_str(),response.size());
    }
    //hanlderEvent处理读事件
    void handlerReadEvent(fd_set& rfds){
      //监听套接字的读事件就绪
      if(FD_ISSET(_fdarray[0],&rfds))accepter();
      for(int i=1;i<fdnum;++i){
        if(_fdarray[i]==defaultfd)continue;
        //连接套接字的读事件就绪
        if(FD_ISSET(_fdarray[i],&rfds))recver(_fdarray[i],i);
      }
    }
    void start(){
      //只处理读事件
      fd_set rfds;
      while(true){
        FD_ZERO(&rfds);
        int maxfd=_fdarray[0];
        //获取最大文件描述符,并将所有文件描述符加入读事件的位图中
        for(int i=0;i<fdnum;++i){
           if(_fdarray[i]==defaultfd)continue;
           FD_SET(_fdarray[i],&rfds);
           if(_fdarray[i]>maxfd)maxfd=_fdarray[i];
        }
        //select阻塞式的监视文件描述符的事件(状态)是否就绪
        int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);
        switch(n){
          case 0:
          cout<<"select  timeout"<<endl;
          log(NORMAL,"select  timeout");
          break;
          case -1:
          log(ERROR,"select error,errno=%d,strerror=%s",errno,strerror(errno));
          break;
          default:
          //有读事件就绪
          //找到哪个套接字的读事件就绪了
          handlerReadEvent(rfds);
          break;
        }
        sleep(1);
      }
    }
    ~selectServer(){
    if(_listenSockfd>0)close(_listenSockfd);
    delete[] _fdarray;
    }
  private:
    int _listenSockfd;
    uint16_t _port;
    int* _fdarray;
    func_t _func;
  };
}

sock.hpp

#pragma oncce 
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <strings.h>
#include <unistd.h>
#include <sys/wait.h>
#include "Log.hpp"
#include "public.hpp"
using namespace std;

class Sock{
  static const int backlog=5;
public:
  //创建套接字文件描述符
  static int Socket(){
    int sockfd=socket(AF_INET,SOCK_STREAM,0);
    if(sockfd==-1){
      log(ERROR,"create a  socket error");
      exit(SOCKET_ERR);
    }
    else log(NORMAL,"create socket success, sockfd=%d",sockfd);
    //地址复用
    int opt=1;
    setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));
    return sockfd;
  }
  //给套接字绑定端口号
  static void Bind(int sockfd,uint16_t port){
    struct sockaddr_in local;
    socklen_t len=sizeof(local);
    bzero(&local,len);
    local.sin_family=AF_INET;
    local.sin_addr.s_addr=INADDR_ANY;
    local.sin_port=htons(port);
    int n=bind(sockfd,(const struct sockaddr*)&local,len);
    if(n==-1){
      log(ERROR,"bind sockfd error");
      exit(BIND_ERR);
    }
    else log(NORMAL,"bind sockfd success");
  }
  //监听套接字
  static void Listen(int sockfd){
    int n=listen(sockfd,backlog);
    if(n==-1){
      log(ERROR,"listen sockfd error");
      exit(BIND_ERR);
    }
    else log(NORMAL,"listen sockfd success");
  }
  //获取新连接套接字
  static int Accept(int listenSockfd,string* clientIp,uint16_t* clientPort){
    struct sockaddr_in client;
    socklen_t len=sizeof(client);
    bzero(&client,len);
    int sockfd=accept(listenSockfd,(struct sockaddr*)&client,&len);
    if(sockfd==-1)log(ERROR,"accept sockfd error");
    else {
      log(NORMAL,"accept a link socket=%d",sockfd);
      *clientIp=inet_ntoa(client.sin_addr);
      *clientPort=ntohs(client.sin_port);
    }
    return sockfd;
  }
};
     

Log.hpp

#pragma once
#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <cstdio>
#define NORMAL 0
#define DEBUG 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
using namespace std;
class LogMessage{
private:
  string log_path="./log.txt";
  string err_path="./err.txt";
  static string getLevel(int level){
    switch(level){
      case NORMAL:
      return "NORMAL";
      break;
      case DEBUG:
      return "DEBUG";
      break;
      case WARNING:
      return "WARNING";
      break;
      case ERROR:
      return "ERROR";
      break;
      case FATAL:
      return "FATAL";
      break;
      default:
      return "OTHER";
    }
  }
  string getTime(){
    time_t now=time(nullptr);
    struct tm* t=localtime(&now);
    int year=t->tm_year+1900;
    int mon=t->tm_mon+1;
    int day=t->tm_mday;
    int hour=t->tm_hour;
    int min=t->tm_min;
    int sec=t->tm_sec;
    char buffer[64];
    sprintf(buffer,"%d-%02d-%02d %02d:%02d:%02d",year,mon,day,hour,min,sec);
    return buffer;
  }
public:
  void operator()(int level,const char* format,...){
    string lev=getLevel(level);
    string time=getTime();
    va_list list;
    va_start(list,format);
    char msg[1024];
    vsnprintf(msg,sizeof(msg),format,list);
    FILE* log=fopen(log_path.c_str(),"a+");
    FILE* err=fopen(err_path.c_str(),"a+");
    if(log!=nullptr&&err!=nullptr){
      FILE* cur=nullptr;
      if(level==NORMAL||level==DEBUG){
        cur=log;
      }
      else cur=err;
      fprintf(cur,"[%s][%s][%s]\n",lev.c_str(),time.c_str(),msg);
      fclose(log);
      fclose(err);
    }                                                                                                                            
  }
};
static LogMessage log; 

public.hpp

#pragma once
#include <iostream>
enum{SOCKET_ERR=1,BIND_ERR,LISTEN_ERR,ACCEPT_ERR,USAGE_ERR};

原文地址:https://blog.csdn.net/m0_73919451/article/details/143414752

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!