SRS流媒体源码解析--service
本文主要解析一下SRS3.0 service部分源码,主要和srs_service_st模块。
srs_service_st
模块包含了网络服务的基础实现,特别是与套接字(sockets)和网络通信相关的功能。主要功能和特点包括:
(1)初始化和关闭: srs_st_init 和 srs_close_stfd 函数用于初始化网络服务和关闭套接字。
(2)错误处理: 使用 srs_error_t 类型来处理和报告错误。
(3)网络选项设置: 设置套接字选项。
(4)TCP 和 UDP 监听: srs_tcp_listen 和 srs_udp_listen 函数用于创建 TCP 和 UDP 监听服务。
连接管理: SrsTcpClient 类提供了 TCP 客户端连接的管理,包括连接、读取、写入和关闭连 接。套接字操作: 提供了对套接字文件描述符(srs_netfd_t)的操作,如打开、关闭、读取 和写入。
头文件定义:
(1)对线程/协程的API的抽象
// 存储线程的句柄
typedef void* srs_thread_t;
// 条件变量
typedef void* srs_cond_t;
// 锁
typedef void* srs_mutex_t;
// 获取线程/协程句柄
extern srs_thread_t srs_thread_self();
// 条件变量操作
// 创建一个新的条件变量实例
extern srs_cond_t srs_cond_new();
// 销毁给定的条件变量
extern int srs_cond_destroy(srs_cond_t cond);
// 使当前线程等待条件变量cond。线程会释放锁并阻塞,直到其他线程通过srs_cond_signal或srs_cond_broadcast唤醒它
extern int srs_cond_wait(srs_cond_t cond);
// 带超时的等待版本,如果在指定的timeout时间内条件未被满足,则函数会返回。
extern int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout);
// 唤醒一个等待在条件变量cond上的线程。如果没有线程等待,则什么也不做。
extern int srs_cond_signal(srs_cond_t cond);
// 互斥锁操作
// 创建一个新的互斥锁实例。
extern srs_mutex_t srs_mutex_new();
// 销毁给定的互斥锁
extern int srs_mutex_destroy(srs_mutex_t mutex);
// 加锁操作,如果锁已被持有,则调用线程将阻塞,直到锁可用
extern int srs_mutex_lock(srs_mutex_t mutex);
// 解锁操作,释放由当前线程持有的锁
extern int srs_mutex_unlock(srs_mutex_t mutex);
(2)对网络相关的API进一步封装
```cpp
// 定义网络文件描述符类型,通常用于表示网络连接。
typedef void* srs_netfd_t;
// 初始化ST(可能是底层事件处理库,如libevent),要求使用epoll作为事件驱动机制。
extern srs_error_t srs_st_init();
// 关闭网络文件描述符stfd,同时确保底层的文件描述符也被关闭。
extern void srs_close_stfd(srs_netfd_t& stfd);
// 设置文件描述符fd的FD_CLOEXEC标志,使得在执行exec函数族时自动关闭该文件描述符。
extern srs_error_t srs_fd_closeexec(int fd);
// 设置文件描述符fd的SO_REUSEADDR选项,允许立即绑定到最近使用的且处于TIME_WAIT状态的端口。
extern srs_error_t srs_fd_reuseaddr(int fd);
// 设置文件描述符fd的SO_REUSEPORT选项,允许多个套接字绑定到同一个端口上。
extern srs_error_t srs_fd_reuseport(int fd);
// 设置文件描述符fd的SO_KEEPALIVE选项,启用TCP Keepalive机制,检测连接是否存活。
extern srs_error_t srs_fd_keepalive(int fd);
// 客户端发起TCP连接到指定服务器,server为服务器地址,port为端口号,tm为连接超时时间,成功后pstfd中存放新连接的网络文件描述符。
extern srs_error_t srs_tcp_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd);
// 服务器端监听TCP端点,ip为监听地址,port为端口号,成功后pfd中存放监听的网络文件描述符。
extern srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd);
// 服务器端监听UDP端点,与srs_tcp_listen类似,但针对UDP协议。
extern srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd);
// 获取网络文件描述符对应的底层文件描述符。
extern int srs_netfd_fileno(srs_netfd_t stfd);
// 暂停当前线程执行指定的微秒数。
extern int srs_usleep(srs_utime_t usecs);
// 根据现有的操作系统文件描述符osfd创建一个网络文件描述符。
extern srs_netfd_t srs_netfd_open_socket(int osfd);
// 直接从现有操作系统文件描述符osfd创建一个网络文件描述符,不特指socket。
extern srs_netfd_t srs_netfd_open(int osfd);
// 接收来自stfd的数据,支持UDP,buf为接收缓冲区,len为缓冲区大小,from为发送方地址信息,fromlen为发送方地址长度,tm为超时时间。
extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);
// 接受stfd上的连接请求,针对TCP服务器,成功后返回新的连接文件描述符及客户端地址信息。
extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);
// 从stfd读取数据,nbyte指定要读取的字节数,tm为读取操作的超时时间。
extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout);
// 判断给定的超时时间tm是否表示永不超时。
extern bool srs_is_never_timeout(srs_utime_t tm);
```
(3)实现自动管理互斥锁的功能
// 快速创建锁,##instance是宏参数,会被替换为&instance并且和_srs_auto_free_拼接
#define SrsLocker(instance) \
impl__SrsLocker _srs_auto_free_##instance(&instance)
class impl__SrsLocker
{
private:
srs_mutex_t* lock; // 指向互斥锁的指针
public:
// 构造函数,初始化时加锁
impl__SrsLocker(srs_mutex_t* l) {
lock = l;
int r0 = srs_mutex_lock(*lock); // 加锁
srs_assert(!r0); // 断言加锁成功
}
// 析构函数,自动解锁
virtual ~impl__SrsLocker() {
int r0 = srs_mutex_unlock(*lock); // 解锁
srs_assert(!r0); // 断言解锁成功
}
};
(4)网络通信。
// SrsStSocket 类实现了基于ST(Simple Transport)的TCP套接字封装,
// 提供了同步的socket通信机制,集成于ISrsProtocolReadWriter接口,用于网络数据的读写操作。
class SrsStSocket : public ISrsProtocolReadWriter
{
private:
// 接收和发送超时时间,单位为微秒(srs_utime_t)。
// 使用SRS_UTIME_NO_TIMEOUT表示无超时。
srs_utime_t rtm; // 接收超时时间
srs_utime_t stm; // 发送超时时间
// 已接收和已发送的数据量,单位为字节。
int64_t rbytes; // 已接收的总字节数
int64_t sbytes; // 已发送的总字节数
// 底层ST网络文件描述符,用于实际的I/O操作。
srs_netfd_t stfd;
public:
// 默认构造函数
SrsStSocket();
// 析构函数,释放资源
virtual ~SrsStSocket();
public:
// 初始化函数,使用给定的st网络文件描述符fd来设置套接字。
// 注意:用户需自行管理fd的生命期。
virtual srs_error_t initialize(srs_netfd_t fd);
public:
// 设置接收超时时间
virtual void set_recv_timeout(srs_utime_t tm);
// 获取当前接收超时时间设置
virtual srs_utime_t get_recv_timeout();
// 设置发送超时时间
virtual void set_send_timeout(srs_utime_t tm);
// 获取当前发送超时时间设置
virtual srs_utime_t get_send_timeout();
// 获取已接收的总字节数
virtual int64_t get_recv_bytes();
// 获取已发送的总字节数
virtual int64_t get_send_bytes();
public:
// 从套接字读取数据到buf,可指定实际读取的字节数。
// @param buf 目标缓冲区
// @param size 缓冲区大小
// @param nread 输出参数,实际读取的字节数,可选
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
// 确保从套接字完全读取size个字节到buf。
// @param buf 目标缓冲区
// @param size 必须读取的字节数
// @param nread 输出参数,实际读取的字节数,可选
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
// 向套接字写入数据,可指定实际写入的字节数。
// @param buf 源数据缓冲区
// @param size 要写入的数据大小
// @param nwrite 输出参数,实际写入的字节数,可选
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
// 批量写入io矢量数据到套接字,常用于一次写多个缓冲区数据。
// @param iov 指向io矢量数组的指针
// @param iov_size io矢量数组的元素数量
// @param nwrite 输出参数,实际写入的总字节数,可选
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
// SrsTcpClient 类实现了ISrsProtocolReadWriter接口,专用于创建TCP客户端连接。
class SrsTcpClient : public ISrsProtocolReadWriter
{
private:
// 网络文件描述符,用于表示与服务器的TCP连接。
srs_netfd_t stfd;
// 持有一个SrsStSocket实例,用于实际的TCP套接字读写操作。
SrsStSocket* io;
// 服务器的主机名或IP地址。
std::string host;
// 服务器监听的端口号。
int port;
// 连接超时时间,单位为srs_utime_t。
srs_utime_t timeout;
public:
// 构造函数,初始化TCP客户端实例。
// @param h 服务器的IP地址或主机名。
// @param p 服务器端口号。
// @param tm 连接超时时间。
SrsTcpClient(std::string h, int p, srs_utime_t tm);
// 析构函数,释放资源。
virtual ~SrsTcpClient();
public:
// 建立与服务器的TCP连接。
// @remark 在尝试连接前,会先关闭现有的连接(如果有)。
virtual srs_error_t connect();
private:
// 关闭与服务器的连接。
// @remark 用户在调用此方法后不应再使用该客户端实例。
virtual void close();
// 以下为ISrsProtocolReadWriter接口的实现
public:
// 设置接收数据的超时时间。
virtual void set_recv_timeout(srs_utime_t tm);
// 获取当前接收数据的超时时间设置。
virtual srs_utime_t get_recv_timeout();
// 设置发送数据的超时时间。
virtual void set_send_timeout(srs_utime_t tm);
// 获取当前发送数据的超时时间设置。
virtual srs_utime_t get_send_timeout();
// 获取已接收的数据字节数。
virtual int64_t get_recv_bytes();
// 获取已发送的数据字节数。
virtual int64_t get_send_bytes();
// 从连接中读取数据。
// @param buf 存储读取数据的缓冲区。
// @param size 缓冲区大小。
// @param nread 实际读取的字节数,输出参数。
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
// 确保读取指定大小的数据到缓冲区。
// @param buf 存储读取数据的缓冲区。
// @param size 需要读取的字节数。
// @param nread 实际读取的字节数,输出参数。
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
// 向连接写入数据。
// @param buf 包含待写数据的缓冲区。
// @param size 缓冲区中数据的大小。
// @param nwrite 实际写入的字节数,输出参数。
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
// 批量写入数据到连接。
// @param iov 指向iovec结构体数组的指针,每个结构体描述一块缓冲区。
// @param iov_size 数组中iovec结构体的数量。
// @param nwrite 实际写入的总字节数,输出参数。
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
源文件实现:
检测是否支持epoll,因为有些老的系统可能不支持epoll
// 设置服务器监听socket的backlog大小为512,这是一个推荐值,用以保证服务器能够有效管理待处理的连接请求队列,
// 特别是在高并发情况下。这个值也是nginx等高性能服务器常用的配置。
#define SERVER_LISTEN_BACKLOG 512
// 下面的代码块仅在Linux系统上编译和执行。
#ifdef __linux__
// 引入epoll相关的头文件,epoll是Linux特有的I/O多路复用技术,适用于处理大量并发的文件描述符。
#include <sys/epoll.h>
// 检查当前Linux系统是否支持epoll功能。
// 这个函数通过尝试执行一个不可能成功的epoll_ctl调用来间接判断。
// 如果系统支持epoll,调用应该失败但是错误码不应该是表示函数未实现的ENOSYS。
bool srs_st_epoll_is_supported(void)
{
// 初始化一个epoll_event结构体,设置其感兴趣的事件为EPOLLIN(表示关心读事件),
// data.ptr字段设置为NULL,因为这次调用只是试探性检查并不关心事件的具体处理。
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = NULL;
// 尝试执行一个无效的epoll_ctl操作:使用-1作为epoll_fd(表示不存在的epoll实例),
// 并尝试添加一个同样无效的文件描述符(-1)到epoll集合中。
// 这样的操作必然会失败,但关键在于检查失败的具体原因。
epoll_ctl(-1, EPOLL_CTL_ADD, -1, &ev);
// 如果调用失败且错误码是ENOSYS,说明系统不支持epoll;
// 否则,即使调用失败,只要错误码不是ENOSYS,就认为系统支持epoll。
return (errno != ENOSYS);
}
#endif // __linux__
协程初始化
// 初始化SRS使用的st库(simple thread library),并根据操作系统选择最高效的事件处理系统。
srs_error_t srs_st_init() {
#ifdef __linux__
// 检查Linux系统是否支持epoll。epoll是Linux下用于多路复用I/O的高效接口,
// 但一些较旧的Linux版本可能不支持。如果检测到不支持,则返回错误。
if (!srs_st_epoll_is_supported()) {
return srs_error_new(ERROR_ST_SET_EPOLL, "linux epoll disabled");
}
#endif
// 根据操作系统选择最佳的事件处理系统。在Linux上,默认尝试使用epoll(),
// 而在BSD等系统上则可能使用kqueue。st_set_eventsys(ST_EVENTSYS_ALT)尝试设置为替代的高效事件系统。
// 如果设置失败,则返回错误信息。
if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {
return srs_error_new(ERROR_ST_SET_EPOLL, "st enable st failed, current is %s", st_get_eventsys_name());
}
// 调用st_init初始化st库。如果初始化失败(返回非零值),则记录错误并返回。
int r0 = 0;
if((r0 = st_init()) != 0){
return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0);
}
// 如果一切顺利,打印日志表明st初始化成功,并指明当前使用的事件系统。
srs_trace("st_init success, use %s", st_get_eventsys_name());
// 初始化成功,返回srs_success表示无错误。
return srs_success;
}
// 获取当前协程句柄
srs_thread_t srs_thread_self()
{
return (srs_thread_t)st_thread_self();
}
网络文件的关闭和重用
/**
* 关闭指定的网络文件描述符并设置错误检查。
*
* @param stfd 待关闭的网络文件描述符引用。
*
* 此函数确保安全地关闭给定的网络文件描述符(stfd)。在关闭前,
* 它会检查stfd是否有效,然后调用st_netfd_close进行关闭操作。
* 如果关闭操作失败(返回-1),则会触发一个断言错误(srs_assert)。
* 成功关闭后,将stfd设置为NULL,以防止重复关闭或误用。
*/
void srs_close_stfd(srs_netfd_t& stfd) {
if (stfd) {
// 确保关闭操作成功执行
int err = st_netfd_close((st_netfd_t)stfd);
srs_assert(err != -1); // 如果关闭失败,触发断言
stfd = NULL; // 成功关闭后,清空引用
}
}
/**
* 设置文件描述符为close-on-exec并处理错误。
*
* @param fd 要设置的文件描述符。
* @return srs_error_t 操作状态,成功时返回srs_success,失败时返回具体错误信息。
*
* 该函数通过fcntl系统调用来获取文件描述符(fd)的当前标志,并设置FD_CLOEXEC位,
* 确保在执行exec函数族创建新进程时,该文件描述符自动关闭,避免泄露到子进程中。
* 如果设置操作失败,则使用srs_error_new创建并返回一个表示错误的srs_error_t对象。
*/
srs_error_t srs_fd_closeexec(int fd) {
int flags = fcntl(fd, F_GETFD); // 获取当前文件描述符标志
flags |= FD_CLOEXEC; // 设置close-on-exec标志
if (fcntl(fd, F_SETFD, flags) == -1) { // 尝试设置标志
// 设置失败,返回错误信息
return srs_error_new(ERROR_SOCKET_SETCLOSEEXEC, "无法设置FD_CLOEXEC,文件描述符: %d", fd);
}
return srs_success; // 操作成功,返回成功状态
}
// 设置套接字选项:允许地址重用,以便快速重启服务时不被TIME_WAIT状态阻塞
srs_error_t srs_fd_reuseaddr(int fd) {
int v = 1; // 值设为1,启用选项
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int)) == -1) { // 尝试设置SO_REUSEADDR
// 如果设置失败,返回一个错误描述
return srs_error_new(ERROR_SOCKET_SETREUSEADDR, "设置SO_REUSEADDR失败,文件描述符: %d", fd);
}
return srs_success; // 设置成功,返回成功状态
}
// 尝试设置套接字选项以启用端口共享(如果操作系统支持)
srs_error_t srs_fd_reuseport(int fd) {
#if defined(SO_REUSEPORT) // 检查SO_REUSEPORT是否被当前系统支持
int v = 1; // 启用选项的值
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &v, sizeof(int)) == -1) { // 尝试设置SO_REUSEPORT
// 如果设置失败,记录一个警告(而非错误),因为并非所有系统都支持此选项
srs_warn("设置SO_REUSEPORT失败,文件描述符: %d", fd);
}
#else // 如果SO_REUSEPORT未定义,提示用户该特性不受支持
#warning "您的操作系统不支持SO_REUSEPORT功能"
srs_warn("您的操作系统不支持SO_REUSEPORT,该功能在Linux内核3.9及以上版本可用");
#endif
return srs_success; // 不管是否设置成功(取决于支持情况),都视为操作成功并返回
}
心跳检测:
/**
* 设置文件描述符的TCP Keepalive选项。
*
* @param fd 需要设置Keepalive选项的文件描述符。
* @return srs_error_t 函数执行结果,成功返回srs_success,失败返回相应的错误信息。
*
* 此函数尝试为给定的文件描述符(fd)启用TCP Keepalive功能。TCP Keepalive是一种机制,
* 用于探测对端是否仍然在线以及连接是否活跃。如果连接在一定时间内没有数据传输,
* 系统将自动发送Keepalive探针以检查连接的状态。这有助于及时发现并关闭空闲或已断开的连接,
* 防止资源的无效占用。
*
* 注意:此功能的可用性依赖于操作系统定义的SO_KEEPALIVE常量。在不支持该选项的平台上,
* 此函数将直接返回成功,因为在那些环境下无法执行Keepalive设置。
*/
srs_error_t srs_fd_keepalive(int fd) {
#ifdef SO_KEEPALIVE // 检查当前平台是否支持SO_KEEPALIVE选项
int v = 1; // 启用Keepalive的标志值
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &v, sizeof(int)) == -1) { // 尝试设置SO_KEEPALIVE选项
// 如果设置失败,返回错误信息
return srs_error_new(ERROR_SOCKET_SETKEEPALIVE, "设置SO_KEEPALIVE失败,文件描述符: %d", fd);
}
#endif
return srs_success; // 设置成功或在不支持的平台上直接返回成功
}
建立TCP连接
/**
* 建立一个TCP连接到指定的服务器和端口。
*
* @param server 服务器的地址(域名或IP)。
* @param port 目标服务器的端口号。
* @param tm 连接超时时间(以微秒计),若为SRS_UTIME_NO_TIMEOUT则无超时限制。
* @param pstfd 输出参数,成功连接后存储连接的文件描述符指针。
* @return srs_error_t 操作结果,成功返回srs_success,失败返回具体的错误信息。
*
* 此函数执行以下步骤建立TCP连接:
* 1. 根据输入的超时时间初始化连接超时变量。
* 2. 清零并准备用于存储连接文件描述符的输出参数。
* 3. 将端口号转换为字符串格式,用于DNS解析。
* 4. 初始化addrinfo结构,设置地址族为任意(AF_UNSPEC),套接字类型为SOCK_STREAM。
* 5. 调用getaddrinfo进行地址解析,获取服务器地址信息。
* 6. 创建一个TCP套接字,根据解析得到的地址族。
* 7. 使用st_netfd_open_socket将原始套接字包装为SRS使用的网络文件描述符。
* 8. 使用st_connect尝试连接到目标服务器,应用超时设置。
* 9. 若连接成功,将网络文件描述符存储到输出参数*pstfd中;失败则关闭文件描述符并返回错误。
*/
srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd) {
// 定义一个变量来存储超时时间,初始设为无超时。
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
// 如果传入的超时时间tm不是默认值,则设置超时时间。
if (tm != SRS_UTIME_NO_TIMEOUT) {
timeout = tm;
}
// 初始化传出参数pstfd为NULL。
*pstfd = NULL;
// 定义一个局部变量stfd,初始设为NULL。
srs_netfd_t stfd = NULL;
// 定义一个字符数组来存储端口号的字符串表示。
char sport[8];
// 使用snprintf函数将端口号格式化为字符串。
snprintf(sport, sizeof(sport), "%d", port);
// 定义addrinfo结构体变量hints,用于指定getaddrinfo的搜索条件。
addrinfo hints;
// 将hints清零。
memset(&hints, 0, sizeof(hints));
// 设置搜索条件为协议族不指定,即IPv4和IPv6都搜索。
hints.ai_family = AF_UNSPEC;
// 设置搜索条件为流式套接字。
hints.ai_socktype = SOCK_STREAM;
// 定义addrinfo指针r,用于存储getaddrinfo的结果。
addrinfo* r = NULL;
// 使用SrsAutoFree宏自动释放r指向的内存。
SrsAutoFree(addrinfo, r);
// 调用getaddrinfo获取服务器地址信息。
if(getaddrinfo(server.c_str(), sport, (const addrinfo*)&hints, &r)) {
// 如果失败,返回错误。
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info");
}
// 根据getaddrinfo的结果创建套接字。
int sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol);
// 如果创建失败,返回错误。
if(sock == -1){
return srs_error_new(ERROR_SOCKET_CREATE, "create socket");
}
// 断言stfd当前是NULL。
srs_assert(!stfd);
// 打开套接字为网络文件描述符。
stfd = st_netfd_open_socket(sock);
// 如果打开失败,关闭套接字并返回错误。
if(stfd == NULL){
::close(sock);
return srs_error_new(ERROR_ST_OPEN_SOCKET, "open socket");
}
// 尝试连接到服务器。
if (st_connect((st_netfd_t)stfd, r->ai_addr, r->ai_addrlen, timeout) == -1){
// 如果连接失败,关闭网络文件描述符并返回错误。
srs_close_stfd(stfd);
return srs_error_new(ERROR_ST_CONNECT, "connect to %s:%d", server.c_str(), port);
}
// 将打开的网络文件描述符赋值给传出参数pstfd。
*pstfd = stfd;
// 如果连接成功,返回成功状态。
return srs_success;
}
设置TCP监听套接字的参数并开始监听指定地址上的连接请求
// 定义一个函数,用于在指定的IP地址和端口上创建并设置TCP监听套接字。
srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd) {
// 初始化错误变量为成功状态。
srs_error_t err = srs_success;
// 定义一个字符数组,用于存储端口号的字符串表示。
char sport[8];
// 使用snprintf函数将端口号格式化为字符串。
snprintf(sport, sizeof(sport), "%d", port);
// 定义addrinfo结构体变量hints,用于指定getaddrinfo的搜索条件。
addrinfo hints;
// 将hints清零。
memset(&hints, 0, sizeof(hints));
// 设置搜索条件为协议族不指定,即IPv4和IPv6都搜索。
hints.ai_family = AF_UNSPEC;
// 设置搜索条件为流式套接字。
hints.ai_socktype = SOCK_STREAM;
// 设置搜索条件为只接受数值IP地址。
hints.ai_flags = AI_NUMERICHOST;
// 定义addrinfo指针r,用于存储getaddrinfo的结果。
addrinfo* r = NULL;
// 使用SrsAutoFree宏自动释放r指向的内存。
SrsAutoFree(addrinfo, r);
// 调用getaddrinfo获取IP地址信息。
if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
// 如果失败,创建一个新的错误并返回,错误信息中包含hints的值。
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",
hints.ai_family, hints.ai_socktype, hints.ai_flags);
}
// 定义一个整型变量fd,用于存储套接字描述符。
int fd = 0;
// 创建套接字。
if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
// 如果创建失败,创建一个新的错误并返回,错误信息中包含套接字的域、类型和协议。
return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",
r->ai_family, r->ai_socktype, r->ai_protocol);
}
// 调用do_srs_tcp_listen函数来设置TCP监听套接字。
if ((err = do_srs_tcp_listen(fd, r, pfd)) != srs_success) {
// 如果设置失败,关闭套接字并包装错误信息后返回。
::close(fd);
return srs_error_wrap(err, "fd=%d", fd);
}
// 如果所有操作都成功,返回初始设置的成功状态。
return err;
}
// 定义一个函数,用于设置TCP监听套接字并返回操作结果。
srs_error_t do_srs_tcp_listen(int fd, addrinfo* r, srs_netfd_t* pfd) {
// 初始化错误变量为成功状态。
srs_error_t err = srs_success;
// 检测TCP连接的存活性,参考GitHub上SRs的1044号问题。
// @see https://github.com/ossrs/srs/issues/1044
if ((err = srs_fd_keepalive(fd)) != srs_success) {
// 如果设置TCP keepalive失败,返回错误。
return srs_error_wrap(err, "set keepalive");
}
// 设置文件描述符在exec系列函数调用后关闭。
if ((err = srs_fd_closeexec(fd)) != srs_success) {
// 如果设置失败,返回错误。
return srs_error_wrap(err, "set closeexec");
}
// 设置套接字地址重用,允许立即重用本地地址。
if ((err = srs_fd_reuseaddr(fd)) != srs_success) {
// 如果设置失败,返回错误。
return srs_error_wrap(err, "set reuseaddr");
}
// 设置端口重用,允许多个套接字绑定到同一端口。
if ((err = srs_fd_reuseport(fd)) != srs_success) {
// 如果设置失败,返回错误。
return srs_error_wrap(err, "set reuseport");
}
// 绑定套接字到指定的地址和端口。
if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {
// 如果绑定失败,返回错误。
return srs_error_new(ERROR_SOCKET_BIND, "bind");
}
// 开始监听传入的连接请求。
if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) {
// 如果监听失败,返回错误。
return srs_error_new(ERROR_SOCKET_LISTEN, "listen");
}
// 打开套接字为网络文件描述符。
if ((*pfd = srs_netfd_open_socket(fd)) == NULL){
// 如果打开失败,返回错误。
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open");
}
// 如果所有操作都成功,返回初始设置的成功状态。
return err;
}
创建UDP套接字并且监听端口
// 函数:do_srs_udp_listen
// 作用:为UDP套接字设置监听相关选项。
srs_error_t do_srs_udp_listen(int fd, addrinfo* r, srs_netfd_t* pfd) {
// 初始化错误状态为成功。
srs_error_t err = srs_success;
// 设置文件描述符在exec()调用后自动关闭。
if ((err = srs_fd_closeexec(fd)) != srs_success) {
// 如果设置失败,包装错误并返回。
return srs_error_wrap(err, "set closeexec");
}
// 允许套接字地址重用,忽略TIME_WAIT状态。
if ((err = srs_fd_reuseaddr(fd)) != srs_success) {
// 如果设置失败,包装错误并返回。
return srs_error_wrap(err, "set reuseaddr");
}
// 允许端口重用,用于负载均衡。
if ((err = srs_fd_reuseport(fd)) != srs_success) {
// 如果设置失败,包装错误并返回。
return srs_error_wrap(err, "set reuseport");
}
// 将套接字绑定到指定的地址和端口。
if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {
// 如果绑定失败,创建新错误并返回。
return srs_error_new(ERROR_SOCKET_BIND, "bind");
}
// 将文件描述符包装为srs_netfd_t类型的套接字。
if ((*pfd = srs_netfd_open_socket(fd)) == NULL){
// 如果打开失败,创建新错误并返回。
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open");
}
// 如果所有设置成功,返回初始成功状态。
return err;
}
// 函数:srs_udp_listen
// 作用:创建并初始化UDP监听套接字。
srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd) {
// 初始化错误状态为成功。
srs_error_t err = srs_success;
// 将端口号转换为字符串形式。
char sport[8];
snprintf(sport, sizeof(sport), "%d", port);
// 设置getaddrinfo搜索条件。
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC; // 协议族不指定,支持IPv4和IPv6。
hints.ai_socktype = SOCK_DGRAM; // 使用数据报套接字。
hints.ai_flags = AI_NUMERICHOST; // 只接受数值IP地址。
// 动态分配内存用于存储getaddrinfo结果。
addrinfo* r = NULL;
SrsAutoFree(addrinfo, r);
// 获取IP地址信息。
if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
// 如果失败,创建新错误并返回。
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",
hints.ai_family, hints.ai_socktype, hints.ai_flags);
}
// 创建套接字。
int fd = 0;
if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
// 如果创建失败,创建新错误并返回。
return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",
r->ai_family, r->ai_socktype, r->ai_protocol);
}
// 调用do_srs_udp_listen函数设置监听选项。
if ((err = do_srs_udp_listen(fd, r, pfd)) != srs_success) {
// 如果设置失败,关闭套接字,包装错误并返回。
::close(fd);
return srs_error_wrap(err, "fd=%d", fd);
}
// 如果所有操作成功,返回初始成功状态。
return err;
}
协程的封装和网络文件描述符的相关操作
// 创建一个新的条件变量。
srs_cond_t srs_cond_new() {
return (srs_cond_t)st_cond_new(); // 调用st_cond_new创建条件变量并转换类型。
}
// 销毁条件变量。
int srs_cond_destroy(srs_cond_t cond) {
return st_cond_destroy((st_cond_t)cond); // 调用st_cond_destroy销毁条件变量。
}
// 等待条件变量。
int srs_cond_wait(srs_cond_t cond) {
return st_cond_wait((st_cond_t)cond); // 调用st_cond_wait等待条件变量。
}
// 带超时的等待条件变量。
int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout) {
return st_cond_timedwait((st_cond_t)cond, (st_utime_t)timeout); // 调用st_cond_timedwait带超时等待条件变量。
}
// 发送条件变量信号。
int srs_cond_signal(srs_cond_t cond) {
return st_cond_signal((st_cond_t)cond); // 调用st_cond_signal发送条件变量信号。
}
// 创建一个新的互斥锁。
srs_mutex_t srs_mutex_new() {
return (srs_mutex_t)st_mutex_new(); // 调用st_mutex_new创建互斥锁并转换类型。
}
// 销毁互斥锁。
int srs_mutex_destroy(srs_mutex_t mutex) {
if (!mutex) {
return 0; // 如果互斥锁为空,直接返回0。
}
return st_mutex_destroy((st_mutex_t)mutex); // 调用st_mutex_destroy销毁互斥锁。
}
// 加锁互斥锁。
int srs_mutex_lock(srs_mutex_t mutex) {
return st_mutex_lock((st_mutex_t)mutex); // 调用st_mutex_lock加锁。
}
// 解锁互斥锁。
int srs_mutex_unlock(srs_mutex_t mutex) {
return st_mutex_unlock((st_mutex_t)mutex); // 调用st_mutex_unlock解锁。
}
// 获取网络文件描述符的文件编号。
int srs_netfd_fileno(srs_netfd_t stfd) {
return st_netfd_fileno((st_netfd_t)stfd); // 调用st_netfd_fileno获取文件编号。
}
// 使当前线程休眠指定的微秒数。
int srs_usleep(srs_utime_t usecs) {
return st_usleep((st_utime_t)usecs); // 调用st_usleep使线程休眠。
}
// 将操作系统文件描述符包装为网络文件描述符。
srs_netfd_t srs_netfd_open_socket(int osfd) {
return (srs_netfd_t)st_netfd_open_socket(osfd); // 调用st_netfd_open_socket并转换类型。
}
// 将操作系统文件描述符包装为通用文件描述符。
srs_netfd_t srs_netfd_open(int osfd) {
return (srs_netfd_t)st_netfd_open(osfd); // 调用st_netfd_open并转换类型。
}
// 从网络文件描述符接收数据。
int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout) {
return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout); // 调用st_recvfrom接收数据。
}
// 接受连接请求。
srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout) {
return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout); // 调用st_accept并转换类型。
}
// 从网络文件描述符读取数据。
ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout) {
return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout); // 调用st_read读取数据。
}
// 检查超时时间是否为永不超时。
bool srs_is_never_timeout(srs_utime_t tm) {
return tm == SRS_UTIME_NO_TIMEOUT; // 比较是否为永不超时的标记值。
}
// SrsStSocket类的构造函数。
SrsStSocket::SrsStSocket() {
// 初始化成员变量。
stfd = NULL;
stm = rtm = SRS_UTIME_NO_TIMEOUT;
rbytes = sbytes = 0;
}
// SrsStSocket类的析构函数。
SrsStSocket::~SrsStSocket() {
// 清理工作,当前为空。
}
// 初始化SrsStSocket对象。
srs_error_t SrsStSocket::initialize(srs_netfd_t fd) {
stfd = fd; // 设置网络文件描述符。
return srs_success; // 返回成功状态。
}
// 设置接收超时时间。
void SrsStSocket::set_recv_timeout(srs_utime_t tm) {
rtm = tm; // 设置接收超时。
}
// 获取接收超时时间。
srs_utime_t SrsStSocket::get_recv_timeout() {
return rtm; // 返回接收超时。
}
// 设置发送超时时间。
void SrsStSocket::set_send_timeout(srs_utime_t tm) {
stm = tm; // 设置发送超时。
}
// 获取发送超时时间。
srs_utime_t SrsStSocket::get_send_timeout() {
return stm; // 返回发送超时。
}
// 获取接收的字节数。
int64_t SrsStSocket::get_recv_bytes() {
return rbytes; // 返回接收字节数。
}
// 获取发送的字节数。
int64_t SrsStSocket::get_send_bytes() {
return sbytes; // 返回发送字节数。
}
从套接字读取数据
// SrsStSocket类成员函数,用于从套接字读取数据。
srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) {
// 初始化错误状态为成功。
srs_error_t err = srs_success;
// 声明用于存储读取字节数的变量。
ssize_t nb_read;
// 如果接收超时时间设置为永不超时,则使用ST_UTIME_NO_TIMEOUT。
if (rtm == SRS_UTIME_NO_TIMEOUT) {
nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
// 否则使用设置的超时时间。
nb_read = st_read((st_netfd_t)stfd, buf, size, rtm);
}
// 如果nread指针不为空,将读取的字节数赋值给它。
if (nread) {
*nread = nb_read;
}
// 成功时返回实际读取的非负整数(值为0表示网络连接关闭或文件结束)。
// 失败时返回-1,并设置errno以指示错误。
if (nb_read <= 0) {
// 参见GitHub上ossrs/srs的200号问题。
// 如果读取失败,并且errno设置为ETIME,表示超时。
if (nb_read < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm));
}
// 如果读取的字节数为0,设置errno为ECONNRESET。
if (nb_read == 0) {
errno = ECONNRESET;
}
// 创建一个新的错误,表示读取失败。
return srs_error_new(ERROR_SOCKET_READ, "read");
}
// 将读取的字节数累加到接收字节计数器。
rbytes += nb_read;
// 返回初始设置的错误状态。
return err;
}
从套接字完全读取数据大小
// SrsStSocket类成员函数,用于从套接字完全读取指定大小的数据。
srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) {
// 初始化错误状态为成功。
srs_error_t err = srs_success;
// 声明用于存储读取字节数的变量。
ssize_t nb_read;
// 如果接收超时时间设置为永不超时,则使用ST_UTIME_NO_TIMEOUT。
if (rtm == SRS_UTIME_NO_TIMEOUT) {
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
// 否则使用设置的超时时间。
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm);
}
// 如果nread指针不为空,则将实际读取的字节数赋值给它。
if (nread) {
*nread = nb_read;
}
// 成功时返回实际读取的非负整数,如果读取的字节数小于请求的nbyte,
// 则表示网络连接关闭或文件结束。
// 失败时返回-1,并设置errno以指示错误。
if (nb_read != (ssize_t)size) {
// 参见GitHub上ossrs/srs的200号问题。
if (nb_read < 0 && errno == ETIME) {
// 如果读取失败,并且errno设置为ETIME,表示超时。
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm));
}
// 如果读取的字节数大于等于0但不等于请求的大小,则设置errno为ECONNRESET。
if (nb_read >= 0) {
errno = ECONNRESET;
}
// 创建一个新的错误,表示没有完全读取数据。
return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully");
}
// 将实际读取的字节数累加到接收字节计数器。
rbytes += nb_read;
// 返回初始设置的错误状态。
return err;
}
TCP连接
// SrsTcpClient类的构造函数,初始化TCP客户端。
SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) {
// 初始化套接字文件描述符为NULL。
stfd = NULL;
// 创建SrsStSocket类的实例。
io = new SrsStSocket();
// 存储提供的主机和端口。
host = h;
port = p;
// 存储超时时间。
timeout = tm;
}
// SrsTcpClient类的析构函数,清理资源。
SrsTcpClient::~SrsTcpClient() {
close(); // 关闭连接并清理套接字资源。
// 释放SrsStSocket实例。
srs_freep(io);
}
// 连接到服务器的成员函数。
srs_error_t SrsTcpClient::connect() {
// 初始化错误状态为成功。
srs_error_t err = srs_success;
close(); // 先关闭任何现有的连接。
// 断言确保套接字文件描述符是NULL。
srs_assert(stfd == NULL);
// 尝试通过srs_tcp_connect函数创建TCP连接。
if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) {
// 如果连接失败,包装错误并返回。
return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout));
}
// 使用创建的套接字文件描述符初始化I/O对象。
if ((err = io->initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "tcp: init socket object");
}
return err; // 返回错误状态。
}
// 关闭TCP连接的成员函数。
void SrsTcpClient::close() {
// 如果I/O对象不存在,则忽略关闭操作。
if (!io) {
return;
}
// 关闭套接字。
srs_close_stfd(stfd);
}
// 设置接收超时时间的成员函数。
void SrsTcpClient::set_recv_timeout(srs_utime_t tm) {
io->set_recv_timeout(tm); // 委托给I/O对象设置。
}
// 获取接收超时时间的成员函数。
srs_utime_t SrsTcpClient::get_recv_timeout() {
return io->get_recv_timeout(); // 委托给I/O对象获取。
}
// 设置发送超时时间的成员函数。
void SrsTcpClient::set_send_timeout(srs_utime_t tm) {
io->set_send_timeout(tm); // 委托给I/O对象设置。
}
// 获取发送超时时间的成员函数。
srs_utime_t SrsTcpClient::get_send_timeout() {
return io->get_send_timeout(); // 委托给I/O对象获取。
}
// 获取接收字节数的成员函数。
int64_t SrsTcpClient::get_recv_bytes() {
return io->get_recv_bytes(); // 委托给I/O对象获取。
}
// 获取发送字节数的成员函数。
int64_t SrsTcpClient::get_send_bytes() {
return io->get_send_bytes(); // 委托给I/O对象获取。
}
// 从TCP连接中读取数据的成员函数。
srs_error_t SrsTcpClient::read(void* buf, size_t size, ssize_t* nread) {
return io->read(buf, size, nread); // 委托给I/O对象执行读取。
}
// 从TCP连接中完全读取数据的成员函数。
srs_error_t SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread) {
return io->read_fully(buf, size, nread); // 委托给I/O对象执行完全读取。
}
// 向TCP连接写入数据的成员函数。
srs_error_t SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite) {
return io->write(buf, size, nwrite); // 委托给I/O对象执行写入。
}
// 向TCP连接写入多个缓冲区数据的成员函数。
srs_error_t SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite) {
return io->writev(iov, iov_size, nwrite); // 委托给I/O对象执行多缓冲区写入。
}
这是一条吃饭博客,由挨踢零声赞助。学C/C++就找挨踢零声,加入挨踢零声,面试不挨踢!
原文地址:https://blog.csdn.net/weixin_54423699/article/details/140293361
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!