自学内容网 自学内容网

C语言 -- 发布订阅机制

发布订阅机制。

什么是发布订阅机制?

消息的发布订阅也是一种软件的架构方式,主要是用于解耦和消息的传递。在这个消息的发布订阅机制主要有两个主要的,即:消息的发布者和订阅者。而这两者是通过一个媒介来进行消息的传递。
发布订阅机制在生活当中非常容易见到这个机制。例如:微信的公众号,公众号的作者就是发布者,我们作为用户就是一个订阅者。当公众号有信息发布的时候,用户(订阅者)通过公众号这个媒介进行获取到作者(发布者)更新的内容信息。当作者(发布者)发布消息的时候,用户(订阅者)才能通过公众号(媒介)获取到作者(发布者)发布的信息,这三者是缺一不可的,用户没有订阅,就算作者发布了也没人接受到信息,并且也需要公众号这个媒介获取到这个信息。这样一看对这个消息的发布订阅机制是不是比较明了了。

三要素

  • 发布者:字面意思主要是发布消息或者事件的,它也不需要去关注谁接受这些信息,也不需要关注哪些用户去订阅了
  • 订阅者:订阅者也就是对某些事件或者消息进行订阅,通过这个消息或者事件可以接受到发布者的消息。
  • 媒介:发布者和订阅者都需要通过这个媒介才能完成一整个闭环。

c语言的实现

那么,知道了这个实现的逻辑,是不是就可以直接开始写代码了。对于这个c语言当中实现这个消息的发布订阅机制我能想到的就是函数指针并且搭配上list一起进行实现。我这里搭配的是linux 的list,我就默认大家都是接触过这个linux/list的使用方法了。

一个创建函数。
一个析构函数。
一个发布的消息函数。
一个订阅的消息函数。
一个取消订阅函数。
事件的查找函数 – 这个函数的设计主要也是考虑到事件ID和事件的回调函数需要一一对应。

对于头文件的设计,我这里使用的是list链表来进行管理动态消息的订阅和消息事件。这里我做了三个结构体用于管理,在publisher_t结构体将所有事件聚合在一起,对所有事件的管理集中化。event_t这个结构体的设计主要是用于管理单个的事件处理,存储了事件的id和订阅者的管理。最顶层当中设计了是订阅者。
另外考虑到多线程的使用还引入了锁。

typedef void (*message_callback_t)(void *message);

typedef struct subscriber
{
    list_t list;
    message_callback_t callback;
} subscriber_t;

typedef struct event
{
    int event_id;
    list_t subscribers;
    list_t list;
} event_t;

typedef struct
{
    list_t events;
} publisher_t;

static pthread_mutex_t lock;

#define MUTEX_INIT()         pthread_mutex_init(&lock, NULL)
#define MUTEX_LOCK()         pthread_mutex_lock(&lock)
#define MUTEX_UNLOCK()       pthread_mutex_unlock(&lock)
#define MBUS_LOCK_DESTROY()  pthread_mutex_destroy(&lock)
  • 事件的查找函数 – 主要是利用list进行遍历一遍链表查找event_t, 如果查找不到的话就申请个内存返回出去。
static event_t *find_or_create_event(publisher_t *publisher, int event_id)
{
    list_t *pos;
    list_for_each(pos, &publisher->events)
    {
        event_t *event = list_entry(pos, event_t, list);
        if (event->event_id == event_id)
        {
            return event;
        }
    }

    event_t *new_event = (event_t *)malloc(sizeof(event_t));
    if (!new_event)
    {
        printf("[find_or_create_event] malloc error\r\n");
        return NULL;
    }
    new_event->event_id = event_id;
    list_init(&new_event->subscribers);
    list_insert_after(&publisher->events, &new_event->list);
    return new_event;
}
  • 创建函数 – 主要就是申请内存和初始化节点并将指针抛出。
publisher_t *publisher_create(void)
{
    publisher_t *publisher = (publisher_t *)malloc(sizeof(publisher_t));
    if (!publisher)
    {
        printf("[publisher_create] Failed to initialize publisher");
        return NULL;
    }
    list_init(&publisher->events);
    MUTEX_INIT();
    return publisher;
}
  • 析构函数 – 这里主要就是释放申请的内存和删除节点、删除锁。
void publisher_destroy(publisher_t *publisher)
{
    MUTEX_LOCK();

    list_t *pos, *n;
    list_for_each_safe(pos, n, &publisher->events)
    {
        event_t *event =  list_entry(pos, event_t, list);
        list_t *sub_pos, *sub_n;
        list_for_each_safe(sub_pos, sub_n, &event->subscribers)
        {
            subscriber_t *subscriber =  list_entry(sub_pos, subscriber_t, list);
            free(subscriber);
        }
        list_remove(&event->list);
        free(event);
    }

    free(publisher);

    MUTEX_UNLOCK();
    MBUS_LOCK_DESTROY();
}

  • 订阅函数 – 订阅者和发布者都需要一个媒介,我这设计需要传入一个时间ID和有事件之后需要处理的事件回调函数,并且插入到节点当中。
void subscribe(publisher_t *publisher, int event_id, message_callback_t callback)
{
    MUTEX_LOCK();

    event_t *event = find_or_create_event(publisher, event_id);
    if (!event)
    {
        MUTEX_UNLOCK();
        return;
    }

    subscriber_t *new_subscriber = (subscriber_t *)malloc(sizeof(subscriber_t));
    if (!new_subscriber)
    {
        printf("[subscribe] Failed to subscribe\r\n");
        MUTEX_UNLOCK();
        return;
    }
    new_subscriber->callback = callback;
    list_insert_after(&event->subscribers, &new_subscriber->list);

    MUTEX_UNLOCK();
}

  • 取消订阅函数 – 这个和订阅函数相反,就是删除节点和释放内存。
void unsubscribe(publisher_t *publisher, int event_id, message_callback_t callback)
{
    MUTEX_LOCK();

    event_t *event = find_or_create_event(publisher, event_id);
    if (!event)
    {
        MUTEX_UNLOCK();
        return;
    }

    list_t *pos, *n;
    list_for_each_safe(pos, n, &event->subscribers)
    {
        subscriber_t *subscriber =  list_entry(pos, subscriber_t, list);
        if (subscriber->callback == callback)
        {
            list_remove(&subscriber->list);
            free(subscriber);
            MUTEX_UNLOCK();
            return;
        }
    }

    MUTEX_UNLOCK();
}

  • 发布函数 – 发布函数发布事件ID并且通知这个时间的订阅者,对应的去执行回调函数,另外传入了一个msg消息参数。
void publish(publisher_t *publisher, int event_id, void *message)
{
    MUTEX_LOCK();

    event_t *event = find_or_create_event(publisher, event_id);
    if (!event)
    {
        MUTEX_UNLOCK();
        return;
    }

    list_t *pos;
    list_for_each(pos, &event->subscribers)
    {
        subscriber_t *subscriber =  list_entry(pos, subscriber_t, list);
        subscriber->callback(message);
    }

    MUTEX_UNLOCK();
}

源码

mbus.c

#include <stdlib.h>
#include <stdio.h>
#include "mbus.h"

static event_t *find_or_create_event(publisher_t *publisher, int event_id)
{
    list_t *pos;
    list_for_each(pos, &publisher->events)
    {
        event_t *event = list_entry(pos, event_t, list);
        if (event->event_id == event_id)
        {
            return event;
        }
    }

    event_t *new_event = (event_t *)malloc(sizeof(event_t));
    if (!new_event)
    {
        printf("[find_or_create_event] malloc error\r\n");
        return NULL;
    }
    new_event->event_id = event_id;
    list_init(&new_event->subscribers);
    list_insert_after(&publisher->events, &new_event->list);
    return new_event;
}

publisher_t *publisher_create(void)
{
    publisher_t *publisher = (publisher_t *)malloc(sizeof(publisher_t));
    if (!publisher)
    {
        printf("[publisher_create] Failed to initialize publisher");
        return NULL;
    }
    list_init(&publisher->events);
    MUTEX_INIT();
    return publisher;
}

void subscribe(publisher_t *publisher, int event_id, message_callback_t callback)
{
    MUTEX_LOCK();

    event_t *event = find_or_create_event(publisher, event_id);
    if (!event)
    {
        MUTEX_UNLOCK();
        return;
    }

    subscriber_t *new_subscriber = (subscriber_t *)malloc(sizeof(subscriber_t));
    if (!new_subscriber)
    {
        printf("[subscribe] Failed to subscribe\r\n");
        MUTEX_UNLOCK();
        return;
    }
    new_subscriber->callback = callback;
    list_insert_after(&event->subscribers, &new_subscriber->list);

    MUTEX_UNLOCK();
}

void unsubscribe(publisher_t *publisher, int event_id, message_callback_t callback)
{
    MUTEX_LOCK();

    event_t *event = find_or_create_event(publisher, event_id);
    if (!event)
    {
        MUTEX_UNLOCK();
        return;
    }

    list_t *pos, *n;
    list_for_each_safe(pos, n, &event->subscribers)
    {
        subscriber_t *subscriber =  list_entry(pos, subscriber_t, list);
        if (subscriber->callback == callback)
        {
            list_remove(&subscriber->list);
            free(subscriber);
            MUTEX_UNLOCK();
            return;
        }
    }

    MUTEX_UNLOCK();
}

void publish(publisher_t *publisher, int event_id, void *message)
{
    MUTEX_LOCK();

    event_t *event = find_or_create_event(publisher, event_id);
    if (!event)
    {
        MUTEX_UNLOCK();
        return;
    }

    list_t *pos;
    list_for_each(pos, &event->subscribers)
    {
        subscriber_t *subscriber =  list_entry(pos, subscriber_t, list);
        subscriber->callback(message);
    }

    MUTEX_UNLOCK();
}

void publisher_destroy(publisher_t *publisher)
{
    MUTEX_LOCK();

    list_t *pos, *n;
    list_for_each_safe(pos, n, &publisher->events)
    {
        event_t *event =  list_entry(pos, event_t, list);
        list_t *sub_pos, *sub_n;
        list_for_each_safe(sub_pos, sub_n, &event->subscribers)
        {
            subscriber_t *subscriber =  list_entry(sub_pos, subscriber_t, list);
            free(subscriber);
        }
        list_remove(&event->list);
        free(event);
    }

    free(publisher);

    MUTEX_UNLOCK();
    MBUS_LOCK_DESTROY();
}


mbus.h

#ifndef MBUS_H
#define MBUS_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <pthread.h>

#include "list.h"


#ifdef __cplusplus
extern "C" {
#endif

/**
 * @brief 自行对接不同平台下的互斥锁
 * 
 */
static pthread_mutex_t lock;

#define MUTEX_INIT()         pthread_mutex_init(&lock, NULL)
#define MUTEX_LOCK()         pthread_mutex_lock(&lock)
#define MUTEX_UNLOCK()       pthread_mutex_unlock(&lock)
#define MBUS_LOCK_DESTROY()  pthread_mutex_destroy(&lock)

typedef void (*message_callback_t)(void *message);


typedef struct subscriber
{
    list_t list;
    message_callback_t callback;
} subscriber_t;

typedef struct event
{
    int event_id;
    list_t subscribers;
    list_t list;
} event_t;

typedef struct
{
    list_t events;
} publisher_t;

void publisher_destroy(publisher_t *publisher);
void publish(publisher_t *publisher, int event_id, void *message);
void unsubscribe(publisher_t *publisher, int event_id, message_callback_t callback);
void subscribe(publisher_t *publisher, int event_id, message_callback_t callback);
publisher_t *publisher_create(void);


#ifdef __cplusplus
}
#endif

#endif /* MBUS_H */



main.c

#include "mbus.h"

typedef enum
{
    MBUS_EVENT_ID_1 = 1,
    MBUS_EVENT_ID_2,
    MBUS_EVENT_ID_3,
    MBUS_EVENT_ID_MAX,
} mbus_event_id_e;

static void mbus_event_cb(void *user_data)
{
    mbus_event_id_e event_id = *(mbus_event_id_e*)user_data;
    printf("event_id :%d\r\n", event_id);
}


int main(void)
{
    printf("[%s] \n", __FUNCTION__);
    publisher_t *my_event_mbus = publisher_create();
    if(!my_event_mbus)
    {
        printf("creata file\r\n");
        return 0;
    }
    for(int i = 1; i < MBUS_EVENT_ID_MAX; i++)
    {
        subscribe(my_event_mbus, i, mbus_event_cb);
        publish(my_event_mbus, i, &i);
    }
    return 0;
}

运行结果:
在这里插入图片描述


原文地址:https://blog.csdn.net/weixin_51914919/article/details/142706232

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!