自学内容网 自学内容网

libevent

前言

        libevent是一个开源的事件通知库,它提供了一种机制,可以通知用户何时进行输入/输出操作。它支持多种操作系统,包括Windows、Linux、Unix和Mac OS X。Libevent的特点是高性能、轻量级、跨平台支持,并且支持多种I/O多路复用技术,如epoll、poll、select、kqueue等。它还支持I/O事件、定时器事件和信号事件,并且是线程安全的。libevent基于Reactor模式实现,它允许用户注册事件,然后通过事件循环检测事件是否就绪,当事件就绪时,libevent会调用相应的回调函数进行处理。

        在reactor中,所有的IO都是同步的,但是事件处理是异步的。例如epoll的epoll_wait阻塞等待所有就绪事件,此为同步,事件注册与事件处理分为两个流程,事件处理又可以分别在多个工作线程,此为异步。IO检测是在事件循环中进行,而在处理事件时又会操作IO,此为事件与IO的关系。

        而libevent封装了两种使用层次:一个是程序员手动处理IO,也就是使用基于libevent的事件处理;另一个是libevent自己处理,程序员只需要负责业务逻辑处理。也就是说,如果我们不管网络IO,那么重点就要放在业务处理方面:连接与数据。比如接收连接(read)、主动连接(write)、数据到达(read)、数据发送(write)等等。我们至少应该掌握这些事件类型,这样才能在业务逻辑处理时信手拈来。

libevent封装层次与使用

reactor对象

        在libevent中封装了reactor对象event_base。其中,evbaseIO多路复用类型字段,比如epoll、select、poll等。evsel就是具体的IO多路复用接口,例如添加事件,删除事件,事件分发等。 event事件数组changelist。信号处理evsigsel。就绪事件队列activequeues。延时处理队列active_later_queue。时间事件timeheap。信号事件sigmap。网络事件io。对于reactor对象,我们只需要关注构造(event_base_new)与销毁(event_base_free)便可。

struct event_base {
/** Function pointers and other data to describe this event_base's
 * backend. */
const struct eventop *evsel;
/** Pointer to backend-specific data. */
void *evbase;

/** List of changes to tell backend about at next dispatch.  Only used
 * by the O(1) backends. */
struct event_changelist changelist;

/** Function pointers used to describe the backend that this event_base
 * uses for signals */
const struct eventop *evsigsel;
/** Data to implement the common signal handler code. */
struct evsig_info sig;

...

/* Active event management. */
/** An array of nactivequeues queues for active event_callbacks (ones
 * that have triggered, and whose callbacks need to be called).  Low
 * priority numbers are more important, and stall higher ones.
 */
struct evcallback_list *activequeues;//就绪事件队列
/** The length of the activequeues array */
int nactivequeues;
/** A list of event_callbacks that should become active the next time
 * we process events, but not this time. */
struct evcallback_list active_later_queue;//延时处理队列

    ...

/** Mapping from file descriptors to enabled (added) events */
struct event_io_map io;//io事件

/** Mapping from signal numbers to enabled (added) events. */
struct event_signal_map sigmap;//信号事件

/** Priority queue of events with timeouts. */
struct min_heap timeheap;//时间事件

    ...

};
/** Structure to define the backend of a given event_base. */
struct eventop {
/** The name of this backend. */
const char *name;
/** Function to set up an event_base to use this backend.  It should
 * create a new structure holding whatever information is needed to
 * run the backend, and return it.  The returned pointer will get
 * stored by event_init into the event_base.evbase field.  On failure,
 * this function should return NULL. */
void *(*init)(struct event_base *);
/** Enable reading/writing on a given fd or signal.  'events' will be
 * the events that we're trying to enable: one or more of EV_READ,
 * EV_WRITE, EV_SIGNAL, and EV_ET.  'old' will be those events that
 * were enabled on this fd previously.  'fdinfo' will be a structure
 * associated with the fd by the evmap; its size is defined by the
 * fdinfo field below.  It will be set to 0 the first time the fd is
 * added.  The function should return 0 on success and -1 on error.
 */
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** As "add", except 'events' contains the events we mean to disable. */
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** Function to implement the core of an event loop.  It must see which
    added events are ready, and cause event_active to be called for each
    active event (usually via event_io_active or such).  It should
    return 0 on success and -1 on error.
 */
int (*dispatch)(struct event_base *, struct timeval *);
/** Function to clean up and free our data from the event_base. */
void (*dealloc)(struct event_base *);
/** Flag: set if we need to reinitialize the event base after we fork.
 */
int need_reinit;
/** Bit-array of supported event_method_features that this backend can
 * provide. */
enum event_method_feature features;
/** Length of the extra information we should record for each fd that
    has one or more active events.  This information is recorded
    as part of the evmap entry for each fd, and passed as an argument
    to the add and del functions above.
 */
size_t fdinfo_len;
};

事件对象

手动处理IO

        事件对象就包含了事件触发后的回调函数,如果事件是定时事件就要记录最小堆索引。如果是自己处理IO,就使用如下的event对象,我们要给出回调函数。使用event_new构建对象、event_free销毁对象。

struct event {
struct event_callback ev_evcallback;//事件触发后的回调函数

/* for managing timeouts */
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
int min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;//定时事件fd

struct event_base *ev_base;//reactor类型

union {
/* used for io events */
struct {
LIST_ENTRY (event) ev_io_next;
struct timeval ev_timeout;
} ev_io;

/* used by signal events */
struct {
LIST_ENTRY (event) ev_signal_next;
short ev_ncalls;
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} ev_;

short ev_events;
short ev_res;/* result passed to event callback */
struct timeval ev_timeout;//超时事件
};

 自动处理IO

        如果我们不自己处理IO,就要使用bufferevent和evconnlistener。在bufferevent中有两个buffer:input、output。分别为读写缓冲区。还有读事件回调readcb和低水平触发回调writecb,wm_read与wm_write分别封装了读写事件的水平线,低水平线表示buffer中最少要有多少个事件才会触发回调,而读事件有低水平默认为0,也就是每一次读事件都会触发回调,而对于高水平线,如果读事件数高于这个数量,就不再处理读事件,对于写事件只有低水平线,默认为0,用户态写缓冲区为空则调用低水平回调,但是我们不需要设置写回调,因为写成功了没有什么要处理。errocb就是处理异常回调。这样的设计我们不需要对出错信息处理,都是在errocb里面处理。

struct bufferevent {
/** Event base for which this bufferevent was created. */
struct event_base *ev_base;
/** Pointer to a table of function pointers to set up how this
    bufferevent behaves. */
const struct bufferevent_ops *be_ops;

/** A read event that triggers when a timeout has happened or a socket
    is ready to read data.  Only used by some subtypes of
    bufferevent. */
struct event ev_read;
/** A write event that triggers when a timeout has happened or a socket
    is ready to write data.  Only used by some subtypes of
    bufferevent. */
struct event ev_write;

/** An input buffer. Only the bufferevent is allowed to add data to
    this buffer, though the user is allowed to drain it. */
struct evbuffer *input;

/** An input buffer. Only the bufferevent is allowed to drain data
    from this buffer, though the user is allowed to add it. */
struct evbuffer *output;

struct event_watermark wm_read;
struct event_watermark wm_write;

bufferevent_data_cb readcb;
bufferevent_data_cb writecb;
/* This should be called 'eventcb', but renaming it would break
 * backward compatibility */
bufferevent_event_cb errorcb;
void *cbarg;

struct timeval timeout_read;
struct timeval timeout_write;

/** Events that are currently enabled: currently EV_READ and EV_WRITE
    are supported. */
short enabled;
};

        evconnlistener则封装了创建、绑定、监听、注册读事件等这些对listenfd的操作。我们需要调用bufferevent_socket_new构建对象、使用bufferevent_free销毁对象。

struct evconnlistener_ops {
int (*enable)(struct evconnlistener *);
int (*disable)(struct evconnlistener *);
void (*destroy)(struct evconnlistener *);
void (*shutdown)(struct evconnlistener *);
evutil_socket_t (*getfd)(struct evconnlistener *);
struct event_base *(*getbase)(struct evconnlistener *);
};

struct evconnlistener {
const struct evconnlistener_ops *ops;
void *lock;
evconnlistener_cb cb;
evconnlistener_errorcb errorcb;
void *user_data;
unsigned flags;
short refcnt;
int accept4_flags;
unsigned enabled : 1;
};

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
struct bufferevent_private *bufev_p;
struct bufferevent *bufev;

#ifdef _WIN32
if (base && event_base_get_iocp_(base))
return bufferevent_async_new_(base, fd, options);
#endif

if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
return NULL;

if (bufferevent_init_common_(bufev_p, base, &bufferevent_ops_socket,
    options) < 0) {
mm_free(bufev_p);
return NULL;
}
bufev = &bufev_p->bev;
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

event_assign(&bufev->ev_read, bufev->ev_base, fd,
    EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
    EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);

evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

evbuffer_freeze(bufev->input, 0);
evbuffer_freeze(bufev->output, 1);

return bufev;
}

void
bufferevent_free(struct bufferevent *bufev)
{
BEV_LOCK(bufev);
bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
bufferevent_cancel_all_(bufev);
bufferevent_decref_and_unlock_(bufev);
}

        当我们需要注册事件时,就要用evconnlistener_new_bind绑定fd。其中backlog是指全连接队列的长度。我们需要传入reactor对象、回调函数等。使用evconnlistener_new_free可销毁对象。

struct evconnlistener *
evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb,
    void *ptr, unsigned flags, int backlog, const struct sockaddr *sa,
    int socklen)
{
struct evconnlistener *listener;
evutil_socket_t fd;
int on = 1;
int family = sa ? sa->sa_family : AF_UNSPEC;
int socktype = SOCK_STREAM | EVUTIL_SOCK_NONBLOCK;

if (backlog == 0)
return NULL;

if (flags & LEV_OPT_CLOSE_ON_EXEC)
socktype |= EVUTIL_SOCK_CLOEXEC;

fd = evutil_socket_(family, socktype, 0);
if (fd == -1)
return NULL;

if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on, sizeof(on))<0)
goto err;

if (flags & LEV_OPT_REUSEABLE) {
if (evutil_make_listen_socket_reuseable(fd) < 0)
goto err;
}

if (flags & LEV_OPT_REUSEABLE_PORT) {
if (evutil_make_listen_socket_reuseable_port(fd) < 0)
goto err;
}

if (flags & LEV_OPT_DEFERRED_ACCEPT) {
if (evutil_make_tcp_listen_socket_deferred(fd) < 0)
goto err;
}

if (flags & LEV_OPT_BIND_IPV6ONLY) {
if (evutil_make_listen_socket_ipv6only(fd) < 0)
goto err;
}

if (sa) {
if (bind(fd, sa, socklen)<0)
goto err;
}

listener = evconnlistener_new(base, cb, ptr, flags, backlog, fd);
if (!listener)
goto err;

return listener;
err:
evutil_closesocket(fd);
return NULL;
}

void
evconnlistener_free(struct evconnlistener *lev)
{
LOCK(lev);
lev->cb = NULL;
lev->errorcb = NULL;
if (lev->ops->shutdown)
lev->ops->shutdown(lev);
listener_decref_and_unlock(lev);
}

事件操作

        libevent中的事件操作也分注册手动事件和自动事件:注册手动事件event_add,销毁手动事件event_del,注册自动事件bufferevent_enable,注销自动事件bufferevent_disable。

int
event_add(struct event *ev, const struct timeval *tv)
{
int res;

if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.", __func__);
return -1;
}

EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

res = event_add_nolock_(ev, tv, 0);

EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

return (res);
}

int
event_del(struct event *ev)
{
return event_del_(ev, EVENT_DEL_AUTOBLOCK);
}

int
bufferevent_enable(struct bufferevent *bufev, short event)
{
struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
short impl_events = event;
int r = 0;

bufferevent_incref_and_lock_(bufev);
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
if (bufev_private->write_suspended)
impl_events &= ~EV_WRITE;

bufev->enabled |= event;

if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;
if (r)
event_debug(("%s: cannot enable 0x%hx on %p", __func__, event, bufev));

bufferevent_decref_and_unlock_(bufev);
return r;
}

int
bufferevent_disable(struct bufferevent *bufev, short event)
{
int r = 0;

BEV_LOCK(bufev);
bufev->enabled &= ~event;

if (bufev->be_ops->disable(bufev, event) < 0)
r = -1;
if (r)
event_debug(("%s: cannot disable 0x%hx on %p", __func__, event, bufev));

BEV_UNLOCK(bufev);
return r;
}

事件循环

        libevent使用event_base_loop处理事件循环,循环被封装在内部,所以不再像普通的通信操作要设置循环。

int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;

/* Grab the lock.  We will release it inside evsel.dispatch, and again
 * as we invoke user callbacks. */
EVBASE_ACQUIRE_LOCK(base, th_base_lock);

if (base->running_loop) {
event_warnx("%s: reentrant invocation.  Only one event_base_loop"
    " can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}

base->running_loop = 1;

clear_time_cache(base);

if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base_(base);

done = 0;

#ifndef EVENT__DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif

base->event_gotterm = base->event_break = 0;

while (!done) {
base->event_continue = 0;
base->n_deferreds_queued = 0;

/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
break;
}

if (base->event_break) {
break;
}

tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/*
 * if we have active events, we just poll new events
 * without waiting.
 */
evutil_timerclear(&tv);
}

/* If we have no events, we just exit */
if (0==(flags&EVLOOP_NO_EXIT_ON_EMPTY) &&
    !event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}

event_queue_make_later_events_active(base);

clear_time_cache(base);

res = evsel->dispatch(base, tv_p);

if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}

update_time_cache(base);

timeout_process(base);

if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
    && N_ACTIVE_CALLBACKS(base) == 0
    && n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));

done:
clear_time_cache(base);
base->running_loop = 0;

EVBASE_RELEASE_LOCK(base, th_base_lock);

return (retval);
}

事件处理

        事件处理则是调用bufferevent_setcb来设置事件回调。

void
bufferevent_setcb(struct bufferevent *bufev,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg)
{
BEV_LOCK(bufev);

bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = eventcb;

bufev->cbarg = cbarg;
BEV_UNLOCK(bufev);
}

libevent优势

  • 高效的并发连接处理:Libevent使用事件驱动的模型,可以在单线程中处理大量并发连接,避免了传统多线程或多进程模型中的上下文切换开销和资源竞争问题。
  • 跨平台的I/O多路复用支持:Libevent提供了多种I/O多路复用技术的支持,如select、poll、epoll、kqueue等,能够在不同操作系统上选择最合适的机制,保证了高性能的网络I/O操作。
  • 简化的事件管理:Libevent封装了底层的事件管理,提供了简单易用的API,使得开发者可以方便地注册和处理各种事件,包括网络I/O、定时器和信号等。
  • 高性能的缓冲区管理:Libevent通过evbuffer提供了动态的字节队列,用于管理网络数据的读写,支持高效的数据缓冲和流动控制。
  • 异步编程模型:Libevent支持异步非阻塞的socket I/O操作,使得单个线程可以管理成千上万个并发连接,适合构建高性能的网络应用。
  • 丰富的事件类型:Libevent支持多种事件类型,包括读事件、写事件、异常事件、定时事件等,用户可以注册这些事件并指定相应的回调函数来处理。
  • 线程安全:Libevent是线程安全的,可以在多线程环境中使用,每个线程可以拥有自己的事件循环,或者通过适当的同步机制共享事件循环。
  • 灵活的事件回调机制:Libevent允许开发者为不同类型的事件指定回调函数,这些回调函数在事件发生时被调用,提供了灵活的事件处理方式。
  • 支持SSL:Libevent支持SSL加密,使得开发者可以构建安全的网络通信。
  • 轻量级和高性能:Libevent专注于网络功能,代码精炼,易于理解和使用,同时保持了高性能。

原文地址:https://blog.csdn.net/oxygen3000/article/details/142375180

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!