自学内容网 自学内容网

mprpc框架代码详解

protobuf协议

首先,当我们在proto文件中定义了一个Service类
service MonitorManager {
rpc SetMonitorInfo(MonitorInfo) returns(google.protobuf.Empty) {}
rpc GetMonitorInfo(QueryMessage) returns(QueryResults) {}
}

经过proto插件自动生成monitor_info.pb.h和monitor_info.pb.cc文件,里面会有MonitorManager类(供服务端使用)和MonitorManager_Stub 类(供客户端使用)

class MonitorManager : public ::PROTOBUF_NAMESPACE_ID::Service {
virtual void SetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::monitor::proto::MonitorInfo* request,
PROTOBUF_NAMESPACE_ID::Empty* response,
::google::protobuf::Closure* done);

virtual void GetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::monitor::proto::QueryMessage* request,
::monitor::proto::QueryResults* response,
::google::protobuf::Closure* done);

void CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::PROTOBUF_NAMESPACE_ID::Message* request,
::PROTOBUF_NAMESPACE_ID::Message* response,
::google::protobuf::Closure* done);
};

自动生成的MonitorManager类中包含两个我们定义的函数,它是虚函数,意味着我们不能直接使用这个类,而是需要重写这个类

MonitorManager_Stub 类继承了MonitorManager类

class MonitorManager_Stub : public MonitorManager {
  MonitorManager_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);

  void SetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                       const ::monitor::proto::MonitorInfo* request,
                       PROTOBUF_NAMESPACE_ID::Empty* response,
                       ::google::protobuf::Closure* done);
  void GetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                       const ::monitor::proto::QueryMessage* request,
                       ::monitor::proto::QueryResults* response,
                       ::google::protobuf::Closure* done);
};

注意这个类初始化的时候,需要指定一个channel,这个channel是用来网络通信的

客户端

下面我们关注MonitorManager_Stub 类,对应上图左边的流程图

用户需要自定义一个类,如下:

class RpcClient {
   public:
    RpcClient(const std::string& server_address = "localhost:50051");
    ~RpcClient();
    void SetMonitorInfo(const monitor::proto::MonitorInfo& monitor_info);
    void GetMonitorInfo(monitor::proto::QueryMessage& request,
                        monitor::proto::QueryResults& response);

    void LoginRegister(monitor::proto::UserMessage& request,
                       monitor::proto::UserResponseMessage& response);
   public:
    std::unique_ptr<monitor::proto::MonitorManager_Stub> stub_ptr_;
    std::unique_ptr<monitor::proto::UserManager_Stub> user_stub_ptr_;
};

RpcClient::RpcClient(const std::string& server_address)
    : stub_ptr_(std::make_unique<monitor::proto::MonitorManager_Stub>(
          new MprpcChannel())), user_stub_ptr_(std::make_unique<monitor::proto::UserManager_Stub>(
          new MprpcChannel())) {}

在这个类中,需要定义MonitorManager_Stub的指针stub_ptr_,初始化的时候需要new MprpcChannel()

以及 客户端需要调用的远程函数

当用户调用SetMonitorInfo函数时,仅仅是调用了stub_ptr_的成员函数

void RpcClient::SetMonitorInfo(
    const monitor::proto::MonitorInfo& monitor_info) {
    ::google::protobuf::Empty response;
    MprpcController controller;
    stub_ptr_->SetMonitorInfo(&controller, &monitor_info, &response, nullptr);
}

然后调用

void MonitorManager_Stub::SetMonitorInfo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                              const ::monitor::proto::MonitorInfo* request,
                              PROTOBUF_NAMESPACE_ID::Empty* response,
                              ::google::protobuf::Closure* done) {
  channel_->CallMethod(descriptor()->method(0),
                       controller, request, response, done);
}

这个channel_就是我们刚才新建的MprpcChannel对象

MonitorManager_Stub::MonitorManager_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel)
  : channel_(channel), owns_channel_(false) {}

descriptor()->method(0)则指定了调用那个函数,0/1/2/3…代表着不同的函数

这个MethodDescriptor可以获取服务名MonitorManager,和函数名SetMonitorInfo/GetMonitorInfo

service MonitorManager {
rpc SetMonitorInfo(MonitorInfo) returns(google.protobuf.Empty) {}
rpc GetMonitorInfo(QueryMessage) returns(QueryResults) {}
}

然后就会调用Mprpcchannel的CallMethod函数

// 数据格式: header_size + service_name method_name args_size + args
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
    const google::protobuf::ServiceDescriptor* sd = method->service();
    std::string service_name = sd->name();
    std::string method_name = method->name();

    // 获取参数的序列化字符串长度 args_size
    std::string args_str;
    int args_size = 0;

    if (request->SerializeToString(&args_str)) {
        args_size = args_str.size();
    } else {
        std::cout << "serialize request error!" << std::endl;
        controller->SetFailed("seriallize request error!");
        return;
    }

    // 定义rpc的请求header
    mprpc::RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    uint32_t header_size = 0;
    std::string rpc_header_str;
    if (rpcHeader.SerializeToString(&rpc_header_str)) {
        header_size = rpc_header_str.size();
    } else {
        std::cout << "serialize rpc header error!" << std::endl;
        controller->SetFailed("seriallize response error!");
        return;
    }
    // 组织待发送的rpc请求字符串
    // ???header_size二进制存储
    std::string send_rpc_str;
    // 表示 RPC 头部大小的 uint32_t 类型变量 header_size
    // 以二进制形式存储到一个字符串中
    send_rpc_str.insert(0, std::string((char*)&header_size, 4));
    send_rpc_str += rpc_header_str;
    send_rpc_str += args_str;

    // 使用tcp编程
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (clientfd == -1) {
        std::cout << "error:" << errno << std::endl;
        char errtxt[512] = {0};
        sprintf(errtxt, "create socket error! errno: %d", errno);
        controller->SetFailed(errtxt);
        close(clientfd);
        return;
    }
    // 获取远程服务器的ip,port
    ZkClient zkCli;
    zkCli.Start();
    // /UserServiceRpc/Login
    std::string method_path = "/" + service_name + "/" + method_name;
    // 127.0.0.1:8000
    std::string host_data = zkCli.GetData(method_path.c_str());
    if (host_data == "") {
        controller->SetFailed(method_path + " is not exist!");
        return;
    }
    int idx = host_data.find(":");
    if (idx == -1) {
        controller->SetFailed(method_path + " address is invalid!");
        return;
    }
    std::string ip = host_data.substr(0, idx);

    if (MprpcApplication::GetInstance().GetConfig().Load("NAT_mode") == "yes") {
        ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    }

    uint16_t port =
        atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());
    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());

    // 连接rpc服务端口
    if (connect(clientfd, (struct sockaddr*)&server_addr,
                sizeof(server_addr)) == -1) {
        std::cout << "connect error! error:" << errno << std::endl;
        char errtxt[512] = {0};
        sprintf(errtxt, "connect socket error! errno: %d", errno);
        controller->SetFailed(errtxt);
        close(clientfd);
        return;
    }

    // 发送rpc请求
    if (send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0) == -1) {
        std::cout << "send error! errno:" << errno << std::endl;
        char errtxt[512] = {0};
        sprintf(errtxt, "send socket error! errno: %d", errno);
        controller->SetFailed(errtxt);
        close(clientfd);
        return;
    }

    // ?不需要阻塞等待吗
    // 接收rpc请求的响应值
    char recv_buf[4096] = {};
    int recv_size = 0;
    if ((recv_size = recv(clientfd, recv_buf, 4096, 0)) == -1) {
        std::cout << "recv error! errno:" << errno << std::endl;
        char errtxt[512] = {0};
        sprintf(errtxt, "recv socket error! errno: %d", errno);
        controller->SetFailed(errtxt);
        close(clientfd);
        return;
    }

    // 反序列化rpc调用的响应数据
    if (!response->ParseFromArray(recv_buf, recv_size)) {
        std::cout << "parse error! response str: " << recv_buf << std::endl;
        char errtxt[512] = {0};
        sprintf(errtxt, "parse error! response str: %s", recv_buf);
        controller->SetFailed(errtxt);
        return;
    }
    close(clientfd);
}

发送数据

  1. 序列化请求:将请求参数和RPC头部信息序列化为字符串。
  2. 获取服务地址:通过ZooKeeper获取目标服务的IP和端口。
  3. 建立连接:使用TCP协议与服务端建立连接。
  4. 发送请求:将序列化的请求数据发送给服务端。
  5. 接收响应:接收服务端的响应数据并反序列化。
  6. 错误处理:在各个步骤中进行错误处理,确保调用的健壮性。

服务端

服务端会调用NotifyService()用于获取函数名与函数描述符的对应信息,并存在哈希表中
provider.NotifyService(new monitor::ServerManagerImpl());
provider.NotifyService(new monitor::UserManagerImpl());

其中,m_service是服务名描述符,m_method是方法(函数)描述符

因此,可以根据服务名字符串和方法名字符串获取到相应的描述符

当数据发送到服务器后,会执行OnMessage函数,

获取到服务描述符和方法描述符之后,会执行service->CallMethod(method, nullptr, request, response, done),其等价于

ServerManagerImpl->CallMethod()

在callmethod函数中:

void MonitorManager::CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
                             ::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                             const ::PROTOBUF_NAMESPACE_ID::Message* request,
                             ::PROTOBUF_NAMESPACE_ID::Message* response,
                             ::google::protobuf::Closure* done) {
  GOOGLE_DCHECK_EQ(method->service(), file_level_service_descriptors_monitor_5finfo_2eproto[0]);
  switch(method->index()) {
    case 0:
      SetMonitorInfo(controller,
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::monitor::proto::MonitorInfo*>(
                 request),
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<PROTOBUF_NAMESPACE_ID::Empty*>(
                 response),
             done);
      break;
    case 1:
      GetMonitorInfo(controller,
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::monitor::proto::QueryMessage*>(
                 request),
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<::monitor::proto::QueryResults*>(
                 response),
             done);
      break;
    default:
      GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
      break;
  }
}

根据传入的method来确定所调用的函数

当远程函数处理完之后,会有一个response字符串,执行done->Run()发送给用户

这个done对象在初始化的时候,就会绑定服务端的网络通信接口

    google::protobuf::Closure* done =
        google::protobuf::NewCallback<RpcProvider,
                                      const muduo::net::TcpConnectionPtr&,
                                      google::protobuf::Message*>(
            this, &RpcProvider::SendRpcResponse, conn, response);

在执行Run函数时,实际上是调用下述的Run()函数

template <typename Class, typename Arg1, typename Arg2>
class MethodClosure2 : public Closure {
 public:
  typedef void (Class::*MethodType)(Arg1 arg1, Arg2 arg2);

  MethodClosure2(Class* object, MethodType method, bool self_deleting,
                 Arg1 arg1, Arg2 arg2)
    : object_(object), method_(method), self_deleting_(self_deleting),
      arg1_(arg1), arg2_(arg2) {}
  ~MethodClosure2() {}

  void Run() override {
    bool needs_delete = self_deleting_;  // read in case callback deletes
    (object_->*method_)(arg1_, arg2_);
    if (needs_delete) delete this;
  }

 private:
  Class* object_;
  MethodType method_;
  bool self_deleting_;
  Arg1 arg1_;
  Arg2 arg2_;
};


原文地址:https://blog.csdn.net/henghuizan2771/article/details/144748164

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!