项目第十一弹:客户端模块设计与实现
项目第十一弹:客户端设计与消费者管理模块
一、客户端设计
1.模块大框架
客户端其实就很简单了
只需要一个网络模块和业务模块
网络模块负责跟服务器进行通信
业务模块负责构建并发送网络请求,接收并处理网络响应,接收消息进行消费或者向服务器发送消息
2.RabbitMQ弱化客户端提高灵活性
3.模块划分
- 连接模块(无需管理,因为一个连接就是一个客户端)
- 信道管理模块(通过向服务器发送请求来给用户提供相应服务)
那我们还需要什么模块吗?
1.消费者描述模块
1.为何要有消费者模块?
我们之前说过,服务器的消费者描述模块当中的消费处理回调函数只负责:
向该消费者发送消费响应
因为对应真正的消费处理回调函数必须在消费者客户端进行执行
所以客户端也必须要有消费处理回调函数来
真正处理对应的消息,并且确认该消息
2.需要实现额外的管理模块吗?
只不过在客户端这里,因为我们规定一个消费者只能关联一个信道,所以消费者和信道天生就已经绑定到了一起,因此我们无需实现消费者管理模块
只需要实现一个消费者描述结构体即可
3.为何服务器要实现额外的管理模块呢?
服务器也是一个消费者只能关联一个信道啊,为何他就需要实现呢?
因为服务器当中我们要根据队列来负载均衡式选择一个消费者进行消息的推送,因此在服务器模块,消费者必须要跟队列建立联系
所以我们选择将消费者以队列为单位进行管理,而不是信道
【信道的话,在将队列中的消息推送给消费者时就需要遍历所有信道,查找其关联的消费者是否是我这个队列,那样的话效率非常低,而且代码极其不优雅】
而按照队列为单位进行管理,效率高,代码优雅,还能实现负载均衡
2.异步工作线程模块
1.EventLoopThread
2.异步工作线程池
为何我们需要异步工作线程池?
因为我们的消费处理回调函数是一个相对独立,解耦的函数
所以为了解放我们的信道服务执行流,将该函数打包扔到工作线程池当中去完成
将这二者结合起来组成一个新的模块,这个模块就是异步工作线程模块
二、消费者模块实现
其实跟服务器那里的消费者描述一样的:
using ConsumerCallback = std::function<void(const std::string &, const ns_proto::BasicProperities *, const std::string &)>;
struct Consumer
{
using ptr = std::shared_ptr<Consumer>;
Consumer() = default;
Consumer(const std::string &tag, const ConsumerCallback &callback, const std::string &vhost_name, const std::string &qname, bool auto_ack)
: _consumer_tag(tag), _callback(callback), _vhost_name(vhost_name), _queue_name(qname), _auto_ack(auto_ack) {}
std::string _consumer_tag; // 消费者tag(唯一标识)
ConsumerCallback _callback; // 消费者回调函数
std::string _vhost_name; // 消费者队列所在虚拟机名称
std::string _queue_name; // 消费者订阅的队列
bool _auto_ack; // 自动确认标志
};
三、信道管理模块设计与实现
1.BUG?如何解决
跟服务器的信道管理模块差不多,只不过它是组织请求,发送给服务器。
但是有一点需要注意:
因为我们实现的网络服务接口是非阻塞的,而我们用户执行的任务需要一种顺序性,此时就需要条件变量来控制任务执行的同步性
比如:创建队列、绑定队列
客户端执行完创建队列之后,其本质就只是向服务器发送了一个DeclareMsgQueueRequest而已,并未立刻创建该队列
而如果此时直接返回,那么用户就能执行下一步的绑定队列了
一旦BindRequest到达服务器更早,那么队列绑定必然失败,而队列创建却能够成功
此时就坑了,这就是BUG
所以需要条件变量
可是我们怎么确定我们的请求成功被服务器接收并执行了?
通过我们请求当中的req_id即可,它是请求的唯一标识,我们只需要搞一个unordered_map<req_id,Request>的哈希表即可
信道管理模块跟服务器的一样,都是增、删、查,互斥锁+哈希表
2.信道模块的实现
1.对用户提供的服务的实现
注意:信道创建时我们无需用户来提供信道ID,而只需要我们自己在内部创建即可,无需用户操心,这样可以提高用户体验
刚才说了,其实就是:
- 利用参数构建请求
- 发送请求
- 等待响应
- 返回结果
因此我们就可以写出,就是代码多一些,其实思路和步骤都是一样的
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
using BasicCommonResponsePtr = std::shared_ptr<BasicCommonResponse>;
using BasicConsumeResponsePtr = std::shared_ptr<BasicConsumeResponse>;
class Channel
{
public:
Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
: _conn(conn), _codec(codec), _channel_id(UUIDHelper::uuid()) {}
void openChannel()
{
// 组织请求
OpenChannelRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
// 发送请求
_codec->send(_conn, req);
// 等待响应
BasicCommonResponsePtr resp = waitResponse(rid);
}
void closeChannel()
{
// 组织请求
CloseChannelRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
// 发送请求
_codec->send(_conn, req);
// 等待响应
BasicCommonResponsePtr resp = waitResponse(rid);
}
bool declareVirtualHost(const std::string &vhost_name, const std::string &dbfile, const std::string &basedir)
{
// 组织请求
DeclareVirtualHostRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_dbfile(dbfile);
req.set_basedir(basedir);
// 发送请求
_codec->send(_conn, req);
// 等待响应
BasicCommonResponsePtr resp = waitResponse(rid);
// 返回结果
return resp->ok();
}
bool eraseVirtualHost(const std::string &vhost_name)
{
EraseVirtualHostRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool declareExchange(const std::string &vhost_name, const std::string &exchange_name, const ExchangeType type,
bool durable, bool auto_delete, google::protobuf::Map<std::string, std::string> args)
// 直接现代赋值写法搞定他
{
DeclareExchangeRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
req.set_type(type);
req.set_durable(durable);
req.set_auto_delete(auto_delete);
req.mutable_args()->swap(args);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool eraseExchange(const std::string &vhost_name, const std::string &exchange_name)
{
EraseExchangeRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool declareMsgQueue(const std::string &vhost_name, const std::string &queue_name, bool durable, bool exclusive,
bool auto_delete, google::protobuf::Map<std::string, std::string> args)
{
DeclareMsgQueueRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_queue_name(queue_name);
req.set_durable(durable);
req.set_exclusive(exclusive);
req.set_auto_delete(auto_delete);
req.mutable_args()->swap(args);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool eraseMsgQueue(const std::string &vhost_name, const std::string &queue_name)
{
EraseMsgQueueRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_queue_name(queue_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool bind(const std::string &vhost_name, const std::string &exchange_name, const std::string &queue_name, const std::string &binding_key)
{
BindRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
req.set_queue_name(queue_name);
req.set_binding_key(binding_key);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool unBind(const std::string &vhost_name, const std::string &exchange_name, const std::string &queue_name)
{
UnbindRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
req.set_queue_name(queue_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool BasicConsume(const std::string &vhost_name, const std::string &consumer_tag, const std::string &queue_name,
const ConsumerCallback &callback, bool auto_ack)
{
if (_consumer.get() != nullptr)
{
default_error("队列订阅失败,因为该信道已经关联消费者了,关联的消费者tag:%s ,该订阅失败的消费者tag:%s",_consumer->_consumer_tag.c_str(),consumer_tag.c_str());
return false;
}
BasicConsumeRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_consumer_tag(consumer_tag);
req.set_queue_name(queue_name);
req.set_auto_ack(auto_ack);
// 发送请求
_codec->send(_conn, req);
std::ostringstream oss;
BasicCommonResponsePtr resp = waitResponse(rid);
if (resp->ok())
{
_consumer = std::make_shared<Consumer>(consumer_tag, callback, vhost_name, queue_name, auto_ack);
default_info("关联消费者成功: %s",consumer_tag.c_str());
}
else
{
default_info("关联消费者失败: %s",consumer_tag.c_str());
}
return resp->ok();
}
bool BasicCancel()
{
std::ostringstream oss;
if (_consumer.get() == nullptr)
{
default_error("取消订阅失败 ,因为该信道并未关联消费者");
return false;
}
BasicCancelRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(_consumer->_vhost_name);
req.set_consumer_tag(_consumer->_consumer_tag);
req.set_queue_name(_consumer->_queue_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
if (resp->ok())
{
_consumer.reset();
oss << "取消订阅成功 ,队列名:" << _consumer->_queue_name << "\n";
}
else
{
oss << "服务端取消订阅失败, 故消费者删除失败, 消费者tag: " << _consumer->_consumer_tag << "\n";
}
return resp->ok();
}
bool BasicPublish(const std::string &vhost_name, const std::string &exchange_name, const BasicProperities *bp, const std::string &body)
{
BasicPublishRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
req.set_body(body);
if (bp != nullptr)
{
req.mutable_properities()->set_msg_id(bp->msg_id());
req.mutable_properities()->set_mode(bp->mode());
req.mutable_properities()->set_routing_key(bp->routing_key());
}
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
std::ostringstream oss;
if (resp->ok())
{
default_info("发布消息成功 %s",body.c_str());
}
else
{
default_info("发布消息失败 %s",body.c_str());
}
return resp->ok();
}
bool BasicAck(const std::string &vhost_name, const std::string &queue_name, const std::string &msg_id)
{
if (_consumer.get() == nullptr)
{
default_error("确认消息失败 ,因为该信道并未关联消费者");
return false;
}
BasicAckRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_queue_name(queue_name);
req.set_msg_id(msg_id);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
private:
BasicCommonResponsePtr waitResponse(const std::string &rid)
{
std::unique_lock<std::mutex> ulock(_mutex);
// 当lambda返回true时,才能出来; 注意:this指针只能传值捕捉
// 因为lambda支持拷贝构造,因此lambda可以通过创建副本在该函数外部存活
// 而如果引用捕捉this指针的话,那么在函数外部这个this指针就成为了野指针
// 因此lambda无法捕捉this
_cv.wait(ulock, [&rid, this]() -> bool
{ return _resp_map.count(rid) > 0; });
return _resp_map[rid];
}
muduo::net::TcpConnectionPtr _conn;
ProtobufCodecPtr _codec;
std::string _channel_id;
Consumer::ptr _consumer; // 该信道关联的消费者
std::mutex _mutex;
std::condition_variable _cv;
std::unordered_map<std::string, BasicCommonResponsePtr> _resp_map;
};
2.对连接模块提供的接口
-
因为只有连接模块才能监听响应服务器发来的BasicCommonResponse
所以需要给连接模块提供一个放哈希表当中放响应的接口 -
当连接模块收到BasicConsumeResponse时,能够通过该resp找到对应的信道,不过无法拿到其内部的消费者的消费处理回调函数进行调用,因此需要信道模块提供一个能够调用其内部消费者的消费处理回调函数的接口
// 给连接模块用的接口
public:
std::string cid() const
{
// 因为_channel_id在构造函数的初始化列表阶段初始化之后就再也不修改了,所以这里无需加锁,提高效率
return _channel_id;
}
// 连接收到基础响应之后,向hash表中添加响应
void putResponse(const BasicCommonResponsePtr &resp)
{
{
std::unique_lock<std::mutex> ulock(_mutex);
_resp_map[resp->req_id()] = resp;
}
_cv.notify_all(); // 把所有阻塞的线程都唤醒,让他们看看自己能否成功继续往下运行
}
// 连接收到消息推送之后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理
void consume(const BasicConsumeResponsePtr &resp)
{
// 1.看该resp的信道是否相同
if (resp->channel_id() != _channel_id)
{
default_info("消息消费失败,因为resp的信道ID跟本信道ID不同:resp->channel_id():%s ,本信道ID:%s",resp->channel_id().c_str(),_channel_id.c_str());
return;
}
// 2.看是否有消费者
if (_consumer.get() == nullptr)
{
default_info("消息消费失败,因为该信道没有消费者");
return;
}
// 3.看该resp的消费者是否相同
if (resp->consumer_tag() != _consumer->_consumer_tag)
{
default_info("消息消费失败,因为resp的消费者tag跟本消费者tag不同:resp->channel_id():%s ,本信道ID:%s",resp->consumer_tag().c_str(),_consumer->_consumer_tag.c_str());
return;
}
// 3.调用该消费者的消费处理回调函数
_consumer->_callback(resp->consumer_tag(),resp->mutable_properities(),resp->body());
}
3.为何客户端的Channel也要有创建信道这个函数?
因为客户端的这些函数都是对服务器发送相应的请求而已
所以客户端的Channel不会受到自身的限制,也可以这么理解:
客户端的Channel的OpenChannel这个函数是创建服务器方的Channel
但是客户端的Connection的OpenChannel则是创建客户端方的Channel
4.完整代码
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
using BasicCommonResponsePtr = std::shared_ptr<BasicCommonResponse>;
using BasicConsumeResponsePtr = std::shared_ptr<BasicConsumeResponse>;
class Channel
{
public:
using ptr=std::shared_ptr<Channel>;
Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
: _conn(conn), _codec(codec), _channel_id(UUIDHelper::uuid()) {}
void openChannel()
{
// 组织请求
OpenChannelRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
// 发送请求
_codec->send(_conn, req);
// 等待响应
BasicCommonResponsePtr resp = waitResponse(rid);
}
void closeChannel()
{
// 组织请求
CloseChannelRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
// 发送请求
_codec->send(_conn, req);
// 等待响应
BasicCommonResponsePtr resp = waitResponse(rid);
}
bool declareVirtualHost(const std::string &vhost_name, const std::string &dbfile, const std::string &basedir)
{
// 组织请求
DeclareVirtualHostRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_dbfile(dbfile);
req.set_basedir(basedir);
// 发送请求
_codec->send(_conn, req);
// 等待响应
BasicCommonResponsePtr resp = waitResponse(rid);
// 返回结果
return resp->ok();
}
bool eraseVirtualHost(const std::string &vhost_name)
{
EraseVirtualHostRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool declareExchange(const std::string &vhost_name, const std::string &exchange_name, const ExchangeType type,
bool durable, bool auto_delete, google::protobuf::Map<std::string, std::string> args)
// 直接现代赋值写法搞定他
{
DeclareExchangeRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
req.set_type(type);
req.set_durable(durable);
req.set_auto_delete(auto_delete);
req.mutable_args()->swap(args);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool eraseExchange(const std::string &vhost_name, const std::string &exchange_name)
{
EraseExchangeRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool declareMsgQueue(const std::string &vhost_name, const std::string &queue_name, bool durable, bool exclusive,
bool auto_delete, google::protobuf::Map<std::string, std::string> args)
{
DeclareMsgQueueRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_queue_name(queue_name);
req.set_durable(durable);
req.set_exclusive(exclusive);
req.set_auto_delete(auto_delete);
req.mutable_args()->swap(args);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool eraseMsgQueue(const std::string &vhost_name, const std::string &queue_name)
{
EraseMsgQueueRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_queue_name(queue_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool bind(const std::string &vhost_name, const std::string &exchange_name, const std::string &queue_name, const std::string &binding_key)
{
BindRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
req.set_queue_name(queue_name);
req.set_binding_key(binding_key);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool unBind(const std::string &vhost_name, const std::string &exchange_name, const std::string &queue_name)
{
UnbindRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
req.set_queue_name(queue_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
bool BasicConsume(const std::string &vhost_name, const std::string &consumer_tag, const std::string &queue_name,
const ConsumerCallback &callback, bool auto_ack)
{
if (_consumer.get() != nullptr)
{
default_error("队列订阅失败,因为该信道已经关联消费者了,关联的消费者tag:%s ,该订阅失败的消费者tag:%s",_consumer->_consumer_tag.c_str(),consumer_tag.c_str());
}
BasicConsumeRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_consumer_tag(consumer_tag);
req.set_queue_name(queue_name);
req.set_auto_ack(auto_ack);
// 发送请求
_codec->send(_conn, req);
std::ostringstream oss;
BasicCommonResponsePtr resp = waitResponse(rid);
if (resp->ok())
{
_consumer = std::make_shared<Consumer>(consumer_tag, queue_name, callback, auto_ack);
default_info("关联消费者成功: %s",consumer_tag.c_str());
}
else
{
default_info("关联消费者失败: %s",consumer_tag.c_str());
}
return resp->ok();
}
bool BasicCancel()
{
std::ostringstream oss;
if (_consumer.get() == nullptr)
{
default_error("取消订阅失败 ,因为该信道并未关联消费者");
return false;
}
BasicCancelRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(_consumer->_vhost_name);
req.set_consumer_tag(_consumer->_consumer_tag);
req.set_queue_name(_consumer->_queue_name);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
if (resp->ok())
{
_consumer.reset();
oss << "取消订阅成功 ,队列名:" << _consumer->_queue_name << "\n";
}
else
{
oss << "服务端取消订阅失败, 故消费者删除失败, 消费者tag: " << _consumer->_consumer_tag << "\n";
}
return resp->ok();
}
bool BasicPublish(const std::string &vhost_name, const std::string &exchange_name, const BasicProperities *bp, const std::string &body)
{
BasicPublishRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_exchange_name(exchange_name);
req.set_body(body);
if (bp != nullptr)
{
req.mutable_properities()->set_msg_id(bp->msg_id());
req.mutable_properities()->set_mode(bp->mode());
req.mutable_properities()->set_routing_key(bp->routing_key());
}
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
std::ostringstream oss;
if (resp->ok())
{
default_info("发布消息成功 %s",body.c_str());
}
else
{
default_info("发布消息失败 %s",body.c_str());
}
return resp->ok();
}
bool BasicAck(const std::string &vhost_name, const std::string &queue_name, const std::string &msg_id)
{
BasicAckRequest req;
std::string rid = UUIDHelper::uuid();
req.set_req_id(rid);
req.set_channel_id(_channel_id);
req.set_vhost_name(vhost_name);
req.set_queue_name(queue_name);
req.set_msg_id(msg_id);
// 发送请求
_codec->send(_conn, req);
BasicCommonResponsePtr resp = waitResponse(rid);
return resp->ok();
}
// 给连接模块用的接口
public:
std::string cid() const
{
// 因为_channel_id在构造函数的初始化列表阶段初始化之后就再也不修改了,所以这里无需加锁,提高效率
return _channel_id;
}
// 连接收到基础响应之后,向hash表中添加响应
void putResponse(const BasicCommonResponsePtr &resp)
{
{
std::unique_lock<std::mutex> ulock(_mutex);
_resp_map[resp->req_id()] = resp;
}
_cv.notify_all(); // 把所有阻塞的线程都唤醒,让他们看看自己能否成功继续往下运行
}
// 连接收到消息推送之后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理
void consume(const BasicConsumeResponsePtr &resp)
{
std::ostringstream oss;
// 1.看该resp的信道是否相同
if (resp->channel_id() != _channel_id)
{
default_info("消息消费失败,因为resp的信道ID跟本信道ID不同:resp->channel_id():%s ,本信道ID:%s",resp->channel_id().c_str(),_channel_id.c_str());
return;
}
// 2.看是否有消费者
if (_consumer.get() == nullptr)
{
default_info("消息消费失败,因为该信道没有消费者");
return;
}
// 3.看该resp的消费者是否相同
if (resp->consumer_tag() != _consumer->_consumer_tag)
{
default_info("消息消费失败,因为resp的消费者tag跟本消费者tag不同:resp->channel_id():%s ,本信道ID:%s",resp->consumer_tag().c_str(),_consumer->_consumer_tag.c_str());
return;
}
// 3.调用该消费者的消费处理回调函数
_consumer->_callback(resp->consumer_tag(),resp->mutable_properities(),resp->body());
}
private:
BasicCommonResponsePtr waitResponse(const std::string &rid)
{
std::unique_lock<std::mutex> ulock(_mutex);
// 当lambda返回true时,才能出来; 注意:this指针只能传值捕捉
// 因为lambda支持拷贝构造,因此lambda可以通过创建副本在该函数外部存活
// 而如果引用捕捉this指针的话,那么在函数外部这个this指针就成为了野指针
// 因此lambda无法捕捉this
_cv.wait(ulock, [&rid, this]() -> bool
{ return _resp_map.count(rid) > 0; });
return _resp_map[rid];
}
muduo::net::TcpConnectionPtr _conn;
ProtobufCodecPtr _codec;
std::string _channel_id;
Consumer::ptr _consumer; // 该信道关联的消费者
std::mutex _mutex;
std::condition_variable _cv;
std::unordered_map<std::string, BasicCommonResponsePtr> _resp_map;
};
3.信道管理模块的实现
下面就是增、删、查
哈希表+互斥锁
class ChannelManager
{
public:
using ptr = std::shared_ptr<ChannelManager>;
Channel::ptr createChannel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
{
std::unique_lock<std::mutex> ulock(_mutex);
Channel::ptr cp = std::make_shared<Channel>(conn, codec);
_channel_map.insert(std::make_pair(cp->cid(), cp));
return cp;
}
void removeChannel(const std::string &channel_id)
{
std::unique_lock<std::mutex> ulock(_mutex);
_channel_map.erase(channel_id);
}
Channel::ptr getChannel(const std::string &channel_id)
{
std::unique_lock<std::mutex> ulock(_mutex);
auto iter = _channel_map.find(channel_id);
if (iter == _channel_map.end())
return Channel::ptr();
return iter->second;
}
private:
std::mutex _mutex;
std::unordered_map<std::string, Channel::ptr> _channel_map;
};
四、异步工作线程模块
#pragma once
#include "../mqhelper/async_pool.hpp"
#include "muduo/net/EventLoopThread.h"
#include <memory>
/*
异步工作线程模块:
1. muduo库中客户端连接的异步循环线程EventLoopThread
2. 一个是当收到消息后进行异步处理的工作线程池
这两项都不是以连接为单元进行创建的,而是创建后,可以用以多个连接中,因此单独进行封装
*/
namespace ns_mq
{
struct AsyncWorker
{
using ptr=std::shared_ptr<AsyncWorker>;
ns_helper::threadpool _pool;
muduo::net::EventLoopThread _loopthread;
};
}
五、连接模块
0.连接模块的细节点
注意:我们的连接模块并不提供查询来获取信道的操作
只支持在创建信道时获取到的新增信道
这是为了保证信道资源的隔离性,就像是学校食堂当中的号码牌只有在申请的时候才能获取,而不支持查询获取一样
就是为了保证人和号码牌的一一对应
这里也是为了保证线程和号码牌的一一对应
只不过这里一个线程可以领多个号码牌,而一个号码牌只能被一个线程所拥有
1.信道池
当然大家可能会想:能不能先早创建一些信道,搞一个信道池呢?
这样就能实现信道复用了啊,就像是学校里面的号码牌就是号码牌池一样…
2.实现信道池,资源隔离会受到影响吗?
RabbitMQ实现信道池的话,资源隔离并不会受到影响,因为实际上信道所实现的资源隔离是一种访问方式上的隔离,通过信道来对线程进行隔离,使得每个线程访问的资源不会出现交集。
不会出现某个线程向某个交换机发送数据时,另一个线程恰好刚删除完该交换机这种现象
因此信道是可以进行复用的,因为信道本身并不包含任何资源,它所使用的资源全都是连接对应资源,所访问操作资源全都是整个broker服务器当中的资源
换言之,信道仅仅只是一个“令牌/中间层”,只是用来隔离使用时对应访问资源的
完全类似于学校食堂当中的号码牌,号码牌本身并不包含资源,仅仅只是用来建立起学生和饭菜一一对应的关系而已,号码牌可以复用,信道也可以
信道池我们就放在扩展版本实现了,因为RabbitMQ本身并没有提供信道池,为了追求设计上的简化,把这个工作交给了我们客户端编写者
信道池的主要思想是通过重用现有的信道来减少创建和销毁信道的开销,这在高并发的场景下尤其有用
而RabbitMQ本身就更适合在高并发的大数据场景下使用
3.网络搭建模块的实现
class Connection
{
public:
Connection(const std::string &server_ip, uint16_t server_port,,const AsyncWorker::ptr& worker)
: _latch(1), _worker(worker), _client(_worker->_loopthread.startLoop(), muduo::net::InetAddress(server_ip, server_port), "Client"), _dispatcher(std::bind(&Connection::OnUnknownCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))), _channel_manager(std::make_shared<ChannelManager>())
{
_dispatcher.registerMessageCallback<BasicCommonResponse>(std::bind(&Connection::OnCommonResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<BasicConsumeResponse>(std::bind(&Connection::OnConsumeResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.setConnectionCallback(std::bind(OnConnectionCallback, this, std::placeholders::_1));
_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
connect();
}
Channel::ptr OpenChannel();
void CloseChannel(const Channel::ptr &cp);
private:
void connect()
{
// 1. 客户端发起连接
_client.connect();
// 2. 等待连接建立成功
_latch.wait();
}
void OnUnknownCallback(const muduo::net::TcpConnectionPtr &conn, const ns_google::MessagePtr &message, muduo::Timestamp)
{
default_info("未知请求, 我们将断开该连接");
if (conn->connected())
{
conn->shutdown();
}
}
void OnConnectionCallback(const muduo::net::TcpConnectionPtr &conn)
{
std::ostringstream oss;
if (conn->connected())
{
_conn = conn;
_latch.countDown();
default_info("连接建立成功");
}
else
{
_conn.reset();
default_info("连接断开成功");
}
}
void OnCommonResponse(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &resp, muduo::Timestamp);
void OnConsumeResponse(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeResponsePtr &resp, muduo::Timestamp);
AsyncWorker::ptr _worker;
muduo::CountDownLatch _latch;
muduo::net::TcpConnectionPtr _conn;
muduo::net::TcpClient _client;
ProtobufDispatcher _dispatcher;
ProtobufCodecPtr _codec;
ChannelManager::ptr _channel_manager;
};
4.打开关闭信道的实现
打开信道:
- 创建信道
- 打开信道
同理,关闭信道:
- 关闭信道
- 销毁信道
void OpenChannel()
{
// 1.创建channel
Channel::ptr cp = _channel_manager->createChannel(_conn, _codec);
// 2. 打开channel
cp->openChannel();
}
void CloseChannel(const Channel::ptr &cp)
{
// 1. 关闭channel
cp->closeChannel();
// 2. 销毁channel
_channel_manager->removeChannel(cp->cid());
}
5.两个响应回调函数的实现
无非就是先找到信道,然后调用其对应的函数
只不过信道的consume函数可以包装一下抛入线程池
void OnCommonResponse(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &resp, muduo::Timestamp)
{
// 找到该信道,然后将该响应添加到对应信道维护的相应哈希表当中
Channel::ptr cp = _channel_manager->getChannel(resp->channel_id());
if (cp.get() == nullptr)
{
default_info("未找到该信道, 信道ID: %s",resp->channel_id().c_str());
return;
}
cp->putResponse(resp);
}
void OnConsumeResponse(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeResponsePtr &resp, muduo::Timestamp)
{
// 1.找到信道
Channel::ptr cp = _channel_manager->getChannel(resp->channel_id());
if (cp.get() == nullptr)
{
default_info("未找到该信道, 信道ID: %s",resp->channel_id().c_str());
return;
}
// 2.将 调用该信道对应的consume任务包装一下抛入线程池
_worker->_pool.put([cp, resp]()
{ cp->consume(resp); });
}
6.完整代码
class Connection
{
public:
using ptr=std::shared_ptr<Connection>;
Connection(const std::string &server_ip, uint16_t server_port,,const AsyncWorker::ptr& worker)
: _latch(1), _worker(worker), _client(_worker->_loopthread.startLoop(), muduo::net::InetAddress(server_ip, server_port), "Client"), _dispatcher(std::bind(&Connection::OnUnknownCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))), _channel_manager(std::make_shared<ChannelManager>())
{
_dispatcher.registerMessageCallback<BasicCommonResponse>(std::bind(&Connection::OnCommonResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<BasicConsumeResponse>(std::bind(&Connection::OnConsumeResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.setConnectionCallback(std::bind(OnConnectionCallback, this, std::placeholders::_1));
_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
connect();
}
Channel::ptr OpenChannel()
{
// 1.创建channel
Channel::ptr cp = _channel_manager->createChannel(_conn, _codec);
// 2. 打开channel
cp->openChannel();
return cp;
}
void CloseChannel(const Channel::ptr &cp)
{
// 1. 关闭channel
cp->closeChannel();
// 2. 销毁channel
_channel_manager->removeChannel(cp->cid());
}
private:
void connect()
{
// 1. 客户端发起连接
_client.connect();
// 2. 等待连接建立成功
_latch.wait();
}
void OnUnknownCallback(const muduo::net::TcpConnectionPtr &conn, const ns_google::MessagePtr &message, muduo::Timestamp)
{
default_info("未知请求, 我们将断开该连接");
if (conn->connected())
{
conn->shutdown();
}
}
void OnConnectionCallback(const muduo::net::TcpConnectionPtr &conn)
{
std::ostringstream oss;
if (conn->connected())
{
_conn = conn;
_latch.countDown();
default_info("连接建立成功");
}
else
{
_conn.reset();
default_info("连接断开成功");
}
}
void OnCommonResponse(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &resp, muduo::Timestamp)
{
// 找到该信道,然后将该响应添加到对应信道维护的相应哈希表当中
Channel::ptr cp = _channel_manager->getChannel(resp->channel_id());
if (cp.get() == nullptr)
{
default_info("未找到该信道, 信道ID: %s",resp->channel_id().c_str());
return;
}
cp->putResponse(resp);
}
void OnConsumeResponse(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeResponsePtr &resp, muduo::Timestamp)
{
// 1.找到信道
Channel::ptr cp = _channel_manager->getChannel(resp->channel_id());
if (cp.get() == nullptr)
{
default_info("未找到该信道, 信道ID: %s",resp->channel_id().c_str());
return;
}
// 2.将 调用该信道对应的consume任务包装一下抛入线程池
_worker->_pool.put([cp, resp]()
{ cp->consume(resp); });
}
AsyncWorker::ptr _worker;
muduo::CountDownLatch _latch;
muduo::net::TcpConnectionPtr _conn;
muduo::net::TcpClient _client;
ProtobufDispatcher _dispatcher;
ProtobufCodecPtr _codec;
ChannelManager::ptr _channel_manager;
};
以上就是项目第十一弹:客户端模块设计与实现的全部内容
原文地址:https://blog.csdn.net/Wzs040810/article/details/140357945
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!