C语言 -- 发布订阅机制
发布订阅机制。
什么是发布订阅机制?
消息的发布订阅也是一种软件的架构方式,主要是用于解耦和消息的传递。在这个消息的发布订阅机制主要有两个主要的,即:消息的发布者和订阅者。而这两者是通过一个媒介来进行消息的传递。
发布订阅机制在生活当中非常容易见到这个机制。例如:微信的公众号,公众号的作者就是发布者,我们作为用户就是一个订阅者。当公众号有信息发布的时候,用户(订阅者)通过公众号这个媒介进行获取到作者(发布者)更新的内容信息。当作者(发布者)发布消息的时候,用户(订阅者)才能通过公众号(媒介)获取到作者(发布者)发布的信息,这三者是缺一不可的,用户没有订阅,就算作者发布了也没人接受到信息,并且也需要公众号这个媒介获取到这个信息。这样一看对这个消息的发布订阅机制是不是比较明了了。
三要素
- 发布者:字面意思主要是发布消息或者事件的,它也不需要去关注谁接受这些信息,也不需要关注哪些用户去订阅了
- 订阅者:订阅者也就是对某些事件或者消息进行订阅,通过这个消息或者事件可以接受到发布者的消息。
- 媒介:发布者和订阅者都需要通过这个媒介才能完成一整个闭环。
c语言的实现
那么,知道了这个实现的逻辑,是不是就可以直接开始写代码了。对于这个c语言当中实现这个消息的发布订阅机制我能想到的就是函数指针,并且搭配上list一起进行实现。我这里搭配的是linux 的list,我就默认大家都是接触过这个linux/list的使用方法了。
一个创建函数。
一个析构函数。
一个发布的消息函数。
一个订阅的消息函数。
一个取消订阅函数。
事件的查找函数 – 这个函数的设计主要也是考虑到事件ID和事件的回调函数需要一一对应。
对于头文件的设计,我这里使用的是list链表来进行管理动态消息的订阅和消息事件。这里我做了三个结构体用于管理,在publisher_t结构体将所有事件聚合在一起,对所有事件的管理集中化。event_t这个结构体的设计主要是用于管理单个的事件处理,存储了事件的id和订阅者的管理。最顶层当中设计了是订阅者。
另外考虑到多线程的使用还引入了锁。
typedef void (*message_callback_t)(void *message);
typedef struct subscriber
{
list_t list;
message_callback_t callback;
} subscriber_t;
typedef struct event
{
int event_id;
list_t subscribers;
list_t list;
} event_t;
typedef struct
{
list_t events;
} publisher_t;
static pthread_mutex_t lock;
#define MUTEX_INIT() pthread_mutex_init(&lock, NULL)
#define MUTEX_LOCK() pthread_mutex_lock(&lock)
#define MUTEX_UNLOCK() pthread_mutex_unlock(&lock)
#define MBUS_LOCK_DESTROY() pthread_mutex_destroy(&lock)
- 事件的查找函数 – 主要是利用list进行遍历一遍链表查找event_t, 如果查找不到的话就申请个内存返回出去。
static event_t *find_or_create_event(publisher_t *publisher, int event_id)
{
list_t *pos;
list_for_each(pos, &publisher->events)
{
event_t *event = list_entry(pos, event_t, list);
if (event->event_id == event_id)
{
return event;
}
}
event_t *new_event = (event_t *)malloc(sizeof(event_t));
if (!new_event)
{
printf("[find_or_create_event] malloc error\r\n");
return NULL;
}
new_event->event_id = event_id;
list_init(&new_event->subscribers);
list_insert_after(&publisher->events, &new_event->list);
return new_event;
}
- 创建函数 – 主要就是申请内存和初始化节点并将指针抛出。
publisher_t *publisher_create(void)
{
publisher_t *publisher = (publisher_t *)malloc(sizeof(publisher_t));
if (!publisher)
{
printf("[publisher_create] Failed to initialize publisher");
return NULL;
}
list_init(&publisher->events);
MUTEX_INIT();
return publisher;
}
- 析构函数 – 这里主要就是释放申请的内存和删除节点、删除锁。
void publisher_destroy(publisher_t *publisher)
{
MUTEX_LOCK();
list_t *pos, *n;
list_for_each_safe(pos, n, &publisher->events)
{
event_t *event = list_entry(pos, event_t, list);
list_t *sub_pos, *sub_n;
list_for_each_safe(sub_pos, sub_n, &event->subscribers)
{
subscriber_t *subscriber = list_entry(sub_pos, subscriber_t, list);
free(subscriber);
}
list_remove(&event->list);
free(event);
}
free(publisher);
MUTEX_UNLOCK();
MBUS_LOCK_DESTROY();
}
- 订阅函数 – 订阅者和发布者都需要一个媒介,我这设计需要传入一个时间ID和有事件之后需要处理的事件回调函数,并且插入到节点当中。
void subscribe(publisher_t *publisher, int event_id, message_callback_t callback)
{
MUTEX_LOCK();
event_t *event = find_or_create_event(publisher, event_id);
if (!event)
{
MUTEX_UNLOCK();
return;
}
subscriber_t *new_subscriber = (subscriber_t *)malloc(sizeof(subscriber_t));
if (!new_subscriber)
{
printf("[subscribe] Failed to subscribe\r\n");
MUTEX_UNLOCK();
return;
}
new_subscriber->callback = callback;
list_insert_after(&event->subscribers, &new_subscriber->list);
MUTEX_UNLOCK();
}
- 取消订阅函数 – 这个和订阅函数相反,就是删除节点和释放内存。
void unsubscribe(publisher_t *publisher, int event_id, message_callback_t callback)
{
MUTEX_LOCK();
event_t *event = find_or_create_event(publisher, event_id);
if (!event)
{
MUTEX_UNLOCK();
return;
}
list_t *pos, *n;
list_for_each_safe(pos, n, &event->subscribers)
{
subscriber_t *subscriber = list_entry(pos, subscriber_t, list);
if (subscriber->callback == callback)
{
list_remove(&subscriber->list);
free(subscriber);
MUTEX_UNLOCK();
return;
}
}
MUTEX_UNLOCK();
}
- 发布函数 – 发布函数发布事件ID并且通知这个时间的订阅者,对应的去执行回调函数,另外传入了一个msg消息参数。
void publish(publisher_t *publisher, int event_id, void *message)
{
MUTEX_LOCK();
event_t *event = find_or_create_event(publisher, event_id);
if (!event)
{
MUTEX_UNLOCK();
return;
}
list_t *pos;
list_for_each(pos, &event->subscribers)
{
subscriber_t *subscriber = list_entry(pos, subscriber_t, list);
subscriber->callback(message);
}
MUTEX_UNLOCK();
}
源码
mbus.c
#include <stdlib.h>
#include <stdio.h>
#include "mbus.h"
static event_t *find_or_create_event(publisher_t *publisher, int event_id)
{
list_t *pos;
list_for_each(pos, &publisher->events)
{
event_t *event = list_entry(pos, event_t, list);
if (event->event_id == event_id)
{
return event;
}
}
event_t *new_event = (event_t *)malloc(sizeof(event_t));
if (!new_event)
{
printf("[find_or_create_event] malloc error\r\n");
return NULL;
}
new_event->event_id = event_id;
list_init(&new_event->subscribers);
list_insert_after(&publisher->events, &new_event->list);
return new_event;
}
publisher_t *publisher_create(void)
{
publisher_t *publisher = (publisher_t *)malloc(sizeof(publisher_t));
if (!publisher)
{
printf("[publisher_create] Failed to initialize publisher");
return NULL;
}
list_init(&publisher->events);
MUTEX_INIT();
return publisher;
}
void subscribe(publisher_t *publisher, int event_id, message_callback_t callback)
{
MUTEX_LOCK();
event_t *event = find_or_create_event(publisher, event_id);
if (!event)
{
MUTEX_UNLOCK();
return;
}
subscriber_t *new_subscriber = (subscriber_t *)malloc(sizeof(subscriber_t));
if (!new_subscriber)
{
printf("[subscribe] Failed to subscribe\r\n");
MUTEX_UNLOCK();
return;
}
new_subscriber->callback = callback;
list_insert_after(&event->subscribers, &new_subscriber->list);
MUTEX_UNLOCK();
}
void unsubscribe(publisher_t *publisher, int event_id, message_callback_t callback)
{
MUTEX_LOCK();
event_t *event = find_or_create_event(publisher, event_id);
if (!event)
{
MUTEX_UNLOCK();
return;
}
list_t *pos, *n;
list_for_each_safe(pos, n, &event->subscribers)
{
subscriber_t *subscriber = list_entry(pos, subscriber_t, list);
if (subscriber->callback == callback)
{
list_remove(&subscriber->list);
free(subscriber);
MUTEX_UNLOCK();
return;
}
}
MUTEX_UNLOCK();
}
void publish(publisher_t *publisher, int event_id, void *message)
{
MUTEX_LOCK();
event_t *event = find_or_create_event(publisher, event_id);
if (!event)
{
MUTEX_UNLOCK();
return;
}
list_t *pos;
list_for_each(pos, &event->subscribers)
{
subscriber_t *subscriber = list_entry(pos, subscriber_t, list);
subscriber->callback(message);
}
MUTEX_UNLOCK();
}
void publisher_destroy(publisher_t *publisher)
{
MUTEX_LOCK();
list_t *pos, *n;
list_for_each_safe(pos, n, &publisher->events)
{
event_t *event = list_entry(pos, event_t, list);
list_t *sub_pos, *sub_n;
list_for_each_safe(sub_pos, sub_n, &event->subscribers)
{
subscriber_t *subscriber = list_entry(sub_pos, subscriber_t, list);
free(subscriber);
}
list_remove(&event->list);
free(event);
}
free(publisher);
MUTEX_UNLOCK();
MBUS_LOCK_DESTROY();
}
mbus.h
#ifndef MBUS_H
#define MBUS_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <pthread.h>
#include "list.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief 自行对接不同平台下的互斥锁
*
*/
static pthread_mutex_t lock;
#define MUTEX_INIT() pthread_mutex_init(&lock, NULL)
#define MUTEX_LOCK() pthread_mutex_lock(&lock)
#define MUTEX_UNLOCK() pthread_mutex_unlock(&lock)
#define MBUS_LOCK_DESTROY() pthread_mutex_destroy(&lock)
typedef void (*message_callback_t)(void *message);
typedef struct subscriber
{
list_t list;
message_callback_t callback;
} subscriber_t;
typedef struct event
{
int event_id;
list_t subscribers;
list_t list;
} event_t;
typedef struct
{
list_t events;
} publisher_t;
void publisher_destroy(publisher_t *publisher);
void publish(publisher_t *publisher, int event_id, void *message);
void unsubscribe(publisher_t *publisher, int event_id, message_callback_t callback);
void subscribe(publisher_t *publisher, int event_id, message_callback_t callback);
publisher_t *publisher_create(void);
#ifdef __cplusplus
}
#endif
#endif /* MBUS_H */
main.c
#include "mbus.h"
typedef enum
{
MBUS_EVENT_ID_1 = 1,
MBUS_EVENT_ID_2,
MBUS_EVENT_ID_3,
MBUS_EVENT_ID_MAX,
} mbus_event_id_e;
static void mbus_event_cb(void *user_data)
{
mbus_event_id_e event_id = *(mbus_event_id_e*)user_data;
printf("event_id :%d\r\n", event_id);
}
int main(void)
{
printf("[%s] \n", __FUNCTION__);
publisher_t *my_event_mbus = publisher_create();
if(!my_event_mbus)
{
printf("creata file\r\n");
return 0;
}
for(int i = 1; i < MBUS_EVENT_ID_MAX; i++)
{
subscribe(my_event_mbus, i, mbus_event_cb);
publish(my_event_mbus, i, &i);
}
return 0;
}
运行结果:
原文地址:https://blog.csdn.net/weixin_51914919/article/details/142706232
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!