mprpc框架代码详解
protobuf协议
首先,当我们在proto文件中定义了一个Service类service MonitorManager {
rpc SetMonitorInfo(MonitorInfo) returns(google.protobuf.Empty) {}
rpc GetMonitorInfo(QueryMessage) returns(QueryResults) {}
}
经过proto插件自动生成monitor_info.pb.h和monitor_info.pb.cc文件,里面会有MonitorManager类(供服务端使用)和MonitorManager_Stub 类(供客户端使用)
class MonitorManager : public ::PROTOBUF_NAMESPACE_ID::Service {
virtual void SetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::monitor::proto::MonitorInfo* request,
PROTOBUF_NAMESPACE_ID::Empty* response,
::google::protobuf::Closure* done);
virtual void GetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::monitor::proto::QueryMessage* request,
::monitor::proto::QueryResults* response,
::google::protobuf::Closure* done);
void CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::PROTOBUF_NAMESPACE_ID::Message* request,
::PROTOBUF_NAMESPACE_ID::Message* response,
::google::protobuf::Closure* done);
};
自动生成的MonitorManager类中包含两个我们定义的函数,它是虚函数,意味着我们不能直接使用这个类,而是需要重写这个类
MonitorManager_Stub 类继承了MonitorManager类
class MonitorManager_Stub : public MonitorManager {
MonitorManager_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);
void SetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::monitor::proto::MonitorInfo* request,
PROTOBUF_NAMESPACE_ID::Empty* response,
::google::protobuf::Closure* done);
void GetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::monitor::proto::QueryMessage* request,
::monitor::proto::QueryResults* response,
::google::protobuf::Closure* done);
};
注意这个类初始化的时候,需要指定一个channel,这个channel是用来网络通信的
客户端
下面我们关注MonitorManager_Stub 类,对应上图左边的流程图用户需要自定义一个类,如下:
class RpcClient {
public:
RpcClient(const std::string& server_address = "localhost:50051");
~RpcClient();
void SetMonitorInfo(const monitor::proto::MonitorInfo& monitor_info);
void GetMonitorInfo(monitor::proto::QueryMessage& request,
monitor::proto::QueryResults& response);
void LoginRegister(monitor::proto::UserMessage& request,
monitor::proto::UserResponseMessage& response);
public:
std::unique_ptr<monitor::proto::MonitorManager_Stub> stub_ptr_;
std::unique_ptr<monitor::proto::UserManager_Stub> user_stub_ptr_;
};
RpcClient::RpcClient(const std::string& server_address)
: stub_ptr_(std::make_unique<monitor::proto::MonitorManager_Stub>(
new MprpcChannel())), user_stub_ptr_(std::make_unique<monitor::proto::UserManager_Stub>(
new MprpcChannel())) {}
在这个类中,需要定义MonitorManager_Stub的指针stub_ptr_,初始化的时候需要new MprpcChannel()
以及 客户端需要调用的远程函数
当用户调用SetMonitorInfo函数时,仅仅是调用了stub_ptr_的成员函数
void RpcClient::SetMonitorInfo(
const monitor::proto::MonitorInfo& monitor_info) {
::google::protobuf::Empty response;
MprpcController controller;
stub_ptr_->SetMonitorInfo(&controller, &monitor_info, &response, nullptr);
}
然后调用
void MonitorManager_Stub::SetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::monitor::proto::MonitorInfo* request,
PROTOBUF_NAMESPACE_ID::Empty* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}
这个channel_就是我们刚才新建的MprpcChannel对象
MonitorManager_Stub::MonitorManager_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel)
: channel_(channel), owns_channel_(false) {}
descriptor()->method(0)则指定了调用那个函数,0/1/2/3…代表着不同的函数
这个MethodDescriptor可以获取服务名MonitorManager,和函数名SetMonitorInfo/GetMonitorInfo
service MonitorManager {
rpc SetMonitorInfo(MonitorInfo) returns(google.protobuf.Empty) {}
rpc GetMonitorInfo(QueryMessage) returns(QueryResults) {}
}
然后就会调用Mprpcchannel的CallMethod函数
// 数据格式: header_size + service_name method_name args_size + args
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
const google::protobuf::ServiceDescriptor* sd = method->service();
std::string service_name = sd->name();
std::string method_name = method->name();
// 获取参数的序列化字符串长度 args_size
std::string args_str;
int args_size = 0;
if (request->SerializeToString(&args_str)) {
args_size = args_str.size();
} else {
std::cout << "serialize request error!" << std::endl;
controller->SetFailed("seriallize request error!");
return;
}
// 定义rpc的请求header
mprpc::RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);
uint32_t header_size = 0;
std::string rpc_header_str;
if (rpcHeader.SerializeToString(&rpc_header_str)) {
header_size = rpc_header_str.size();
} else {
std::cout << "serialize rpc header error!" << std::endl;
controller->SetFailed("seriallize response error!");
return;
}
// 组织待发送的rpc请求字符串
// ???header_size二进制存储
std::string send_rpc_str;
// 表示 RPC 头部大小的 uint32_t 类型变量 header_size
// 以二进制形式存储到一个字符串中
send_rpc_str.insert(0, std::string((char*)&header_size, 4));
send_rpc_str += rpc_header_str;
send_rpc_str += args_str;
// 使用tcp编程
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (clientfd == -1) {
std::cout << "error:" << errno << std::endl;
char errtxt[512] = {0};
sprintf(errtxt, "create socket error! errno: %d", errno);
controller->SetFailed(errtxt);
close(clientfd);
return;
}
// 获取远程服务器的ip,port
ZkClient zkCli;
zkCli.Start();
// /UserServiceRpc/Login
std::string method_path = "/" + service_name + "/" + method_name;
// 127.0.0.1:8000
std::string host_data = zkCli.GetData(method_path.c_str());
if (host_data == "") {
controller->SetFailed(method_path + " is not exist!");
return;
}
int idx = host_data.find(":");
if (idx == -1) {
controller->SetFailed(method_path + " address is invalid!");
return;
}
std::string ip = host_data.substr(0, idx);
if (MprpcApplication::GetInstance().GetConfig().Load("NAT_mode") == "yes") {
ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
}
uint16_t port =
atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
// 连接rpc服务端口
if (connect(clientfd, (struct sockaddr*)&server_addr,
sizeof(server_addr)) == -1) {
std::cout << "connect error! error:" << errno << std::endl;
char errtxt[512] = {0};
sprintf(errtxt, "connect socket error! errno: %d", errno);
controller->SetFailed(errtxt);
close(clientfd);
return;
}
// 发送rpc请求
if (send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0) == -1) {
std::cout << "send error! errno:" << errno << std::endl;
char errtxt[512] = {0};
sprintf(errtxt, "send socket error! errno: %d", errno);
controller->SetFailed(errtxt);
close(clientfd);
return;
}
// ?不需要阻塞等待吗
// 接收rpc请求的响应值
char recv_buf[4096] = {};
int recv_size = 0;
if ((recv_size = recv(clientfd, recv_buf, 4096, 0)) == -1) {
std::cout << "recv error! errno:" << errno << std::endl;
char errtxt[512] = {0};
sprintf(errtxt, "recv socket error! errno: %d", errno);
controller->SetFailed(errtxt);
close(clientfd);
return;
}
// 反序列化rpc调用的响应数据
if (!response->ParseFromArray(recv_buf, recv_size)) {
std::cout << "parse error! response str: " << recv_buf << std::endl;
char errtxt[512] = {0};
sprintf(errtxt, "parse error! response str: %s", recv_buf);
controller->SetFailed(errtxt);
return;
}
close(clientfd);
}
发送数据
- 序列化请求:将请求参数和RPC头部信息序列化为字符串。
- 获取服务地址:通过ZooKeeper获取目标服务的IP和端口。
- 建立连接:使用TCP协议与服务端建立连接。
- 发送请求:将序列化的请求数据发送给服务端。
- 接收响应:接收服务端的响应数据并反序列化。
- 错误处理:在各个步骤中进行错误处理,确保调用的健壮性。
服务端
服务端会调用NotifyService()用于获取函数名与函数描述符的对应信息,并存在哈希表中provider.NotifyService(new monitor::ServerManagerImpl());
provider.NotifyService(new monitor::UserManagerImpl());
其中,m_service是服务名描述符,m_method是方法(函数)描述符
因此,可以根据服务名字符串和方法名字符串获取到相应的描述符
当数据发送到服务器后,会执行OnMessage函数,
获取到服务描述符和方法描述符之后,会执行service->CallMethod(method, nullptr, request, response, done)
,其等价于
ServerManagerImpl->CallMethod()
在callmethod函数中:
void MonitorManager::CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::PROTOBUF_NAMESPACE_ID::Message* request,
::PROTOBUF_NAMESPACE_ID::Message* response,
::google::protobuf::Closure* done) {
GOOGLE_DCHECK_EQ(method->service(), file_level_service_descriptors_monitor_5finfo_2eproto[0]);
switch(method->index()) {
case 0:
SetMonitorInfo(controller,
::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::monitor::proto::MonitorInfo*>(
request),
::PROTOBUF_NAMESPACE_ID::internal::DownCast<PROTOBUF_NAMESPACE_ID::Empty*>(
response),
done);
break;
case 1:
GetMonitorInfo(controller,
::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::monitor::proto::QueryMessage*>(
request),
::PROTOBUF_NAMESPACE_ID::internal::DownCast<::monitor::proto::QueryResults*>(
response),
done);
break;
default:
GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
break;
}
}
根据传入的method来确定所调用的函数
当远程函数处理完之后,会有一个response字符串,执行done->Run()
发送给用户
这个done对象在初始化的时候,就会绑定服务端的网络通信接口
google::protobuf::Closure* done =
google::protobuf::NewCallback<RpcProvider,
const muduo::net::TcpConnectionPtr&,
google::protobuf::Message*>(
this, &RpcProvider::SendRpcResponse, conn, response);
在执行Run函数时,实际上是调用下述的Run()函数
template <typename Class, typename Arg1, typename Arg2>
class MethodClosure2 : public Closure {
public:
typedef void (Class::*MethodType)(Arg1 arg1, Arg2 arg2);
MethodClosure2(Class* object, MethodType method, bool self_deleting,
Arg1 arg1, Arg2 arg2)
: object_(object), method_(method), self_deleting_(self_deleting),
arg1_(arg1), arg2_(arg2) {}
~MethodClosure2() {}
void Run() override {
bool needs_delete = self_deleting_; // read in case callback deletes
(object_->*method_)(arg1_, arg2_);
if (needs_delete) delete this;
}
private:
Class* object_;
MethodType method_;
bool self_deleting_;
Arg1 arg1_;
Arg2 arg2_;
};
原文地址:https://blog.csdn.net/henghuizan2771/article/details/144748164
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!