【微服务即时通讯系统】——brpc远程过程调用、百度开源的RPC框架、brpc的介绍、brpc的安装、brpc使用和功能测试
文章目录
brpc
1. brpc的介绍
1.1 rpc的介绍
RPC(Remote Procedure Call)远程过程调用,是一种计算机通信协议,它允许程序在不同的计算机之间进行通信和交互,就像本地调用一样。
RPC可以屏蔽了底层的网络通信细节,使得程序间的远程通信如同本地调用一样简单。RPC机制使得开发者能够构建分布式计算系统,其中不同的组件可以分布在不同的计算机上,但它们之间可以像在同一台机器上一样相互调用。
1.2 rpc的原理
服务提供者实现特定业务逻辑并注册到服务中心。
服务中心接收注册信息,为服务消费者提供服务发现通知和负载均衡功能。
服务消费者向服务中心查询服务订阅后,通过 RPC 机制发起对服务提供者的远程调用,接收并处理结果。
rpc一次调用的过程
在 RPC(远程过程调用)机制中,客户端以接口方式调用服务。客户端存根在接收到调用请求后,将方法、入参等信息组装并序列化成可网络传输的消息体(二进制流),然后找到远程服务地址,通过网络(sockets)将消息发送给服务端。
服务端存根收到消息后进行反序列化操作(将二进制流反序列化为消息对象),接着调用本地服务进行处理。服务端将处理结果返回给服务端存根,服务端存根序列化处理结果(将结果消息对象序列化为二进制流)。
再通过网络(sockets)发送至客户端。客户端存根接收到消息后进行反序列化解码(将结果二进制流反序列化为消息对象),最终客户端得到结果。
1.3 grpc和brpc
gRPC
由 Google 开发并开源。
支持 C++、Java、Python、Go 。
使用 Protocol Buffers 作为默认的接口定义语言和数据序列化格式。
基于 HTTP/2 协议,支持双向流和多路复用,能够有效地提高网络传输效率。
适用于构建分布式系统、微服务架构等场景。
bRPC
由 Baidu 开发并开源。
只支持 C++ 。
使用 Protocol Buffers 作为默认的接口定义语言和数据序列化格式。
可以在一个端口上支持多协议服务,如 HTTP/HTTPS、Redis、Thrift 等,具有很高的灵活性。
提供了一些独特的功能,如通过 HTTP 界面调试服务、使用各种性能分析工具等。
适用于搜索、存储、机器学习等高性能系统的开发。
2. brpc的安装
在 Linux 系统(ubuntu) 上安装 brpc 的基本步骤:
先安装依赖:
sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev
安装 brpc:
git clone https://github.com/apache/brpc.git
cd brpc/
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr ..
cmake --build . -j6
make && sudo make install
3. brpc使用
3.1 brpc接口介绍
日志输出类与接口:
包含头文件: #include <butil/logging.h>
namespace logging {
enum LoggingDestination {
LOG_TO_NONE = 0
};
struct BUTIL_EXPORT LoggingSettings {
LoggingSettings();
LoggingDestination logging_dest;
};
bool InitLogging(const LoggingSettings& settings);
}
protobuf 类与接口:
namespace google {
namespace protobuf {
class PROTOBUF_EXPORT Closure {
public:
Closure() {}
virtual ~Closure();
virtual void Run() = 0;
};
inline Closure* NewCallback(void (*function)());
class PROTOBUF_EXPORT RpcController {
bool Failed();
std::string ErrorText() ;
}
}
}
服务端类与接口:
namespace brpc {
struct ServerOptions {
//无数据传输,则指定时间后关闭连接
int idle_timeout_sec; // Default: -1 (disabled)
int num_threads; // Default: #cpu-cores
//....
}
enum ServiceOwnership {
//添加服务失败时,服务器将负责删除服务对象
SERVER_OWNS_SERVICE,
//添加服务失败时,服务器也不会删除服务对象
SERVER_DOESNT_OWN_SERVICE
};
class Server {
int AddService(google::protobuf::Service* service,
ServiceOwnership ownership);
int Start(int port, const ServerOptions* opt);
int Stop(int closewait_ms/*not used anymore*/);
int Join();
//休眠直到 ctrl+c 按下,或者 stop 和 join 服务器
void RunUntilAskedToQuit();
}
class ClosureGuard {
explicit ClosureGuard(google::protobuf::Closure* done);
~ClosureGuard() { if (_done) _done->Run(); }
}
class HttpHeader {
void set_content_type(const std::string& type)
const std::string* GetHeader(const std::string& key)
void SetHeader(const std::string& key,
const std::string& value);
const URI& uri() const { return _uri; }
HttpMethod method() const { return _method; }
void set_method(const HttpMethod method)
int status_code()
void set_status_code(int status_code);
}
class Controller : public google::protobuf::RpcController {
void set_timeout_ms(int64_t timeout_ms);
void set_max_retry(int max_retry);
google::protobuf::Message* response();
HttpHeader& http_response();
HttpHeader& http_request();
bool Failed();
std::string ErrorText();
using AfterRpcRespFnType = std::function<
void(Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res)>;
void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn)
}
客户端类与接口:
namespace brpc {
struct ChannelOptions {
//请求连接超时时间
int32_t connect_timeout_ms;// Default: 200 (milliseconds)
//rpc 请求超时时间
int32_t timeout_ms;// Default: 500 (milliseconds)
//最大重试次数
int max_retry;// Default: 3
//序列化协议类型 options.protocol = "baidu_std";
AdaptiveProtocolType protocol;
//....
}
class Channel : public ChannelBase {
//初始化接口,成功返回 0;
int Init(const char* server_addr_and_port,
const ChannelOptions* options);
4. brpc使用测试
4.1 brpc同步和异步调用
server.hpp
#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"
// 1. 继承于EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServiceImpl : public example::EchoService
{
public:
EchoServiceImpl(){}
~EchoServiceImpl(){}
void Echo(google::protobuf::RpcController* controller,
const ::example::EchoRequest* request,
::example::EchoResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard rpc_guard(done); // 对象析构时自动调用done->Run()
std::cout << "收到消息:" << request->message() << std::endl;
std::string str = request->message() + "--这是响应!!";
response->set_message(str);
//done->Run();
}
};
int main(int argc, char *argv[])
{
// 关闭brpc的默认日志输出
logging::LoggingSettings settings;
settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
logging::InitLogging(settings);
// 2. 构造服务器对象
brpc::Server server;
// 3. 向服务器对象中,新增EchoService服务
EchoServiceImpl echo_service; // 添加服务失败时,服务器不会删除服务对象
int ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);
if (ret == -1)
{
std::cout << "添加Rpc服务失败!\n";
return -1;
}
// 4. 启动服务器
brpc::ServerOptions options;
options.idle_timeout_sec = -1; // 连接空闲超时时间-超时后连接被关闭
options.num_threads = 1; // io线程数量
ret = server.Start(8080, &options);
if (ret == -1)
{
std::cout << "启动服务器失败!\n";
return -1;
}
server.RunUntilAskedToQuit(); // 休眠等待运行结束,避免对SIGINT信号进行处理
return 0;
}
makefile:
all : server client
server : server.cc main.pb.cc
g++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
client : client.cc main.pb.cc
g++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
同步调用:同步调用是指客户端会阻塞收到 server 端的响应或发生错误。
#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"
int main(int argc, char *argv[])
{
// 1. 构造Channel信道,连接服务器
brpc::ChannelOptions options;
options.connect_timeout_ms = -1; // 连接等待超时时间,-1表示一直等待
options.timeout_ms = -1; // rpc请求等待超时时间,-1表示一直等待
options.max_retry = 3; // 请求重试次数
options.protocol = "baidu_std"; // 序列化协议,默认使用baidu_std
brpc::Channel channel;
int ret = channel.Init("127.0.0.1:8080", &options);
if (ret == -1)
{
std::cout << "初始化信道失败!\n";
return -1;
}
// 2. 构造EchoService_Stub对象,用于进行rpc调用
example::EchoService_Stub stub(&channel);
// 3. 进行Rpc调用
example::EchoRequest req;
req.set_message("你好~brpc~!");
brpc::Controller cntl;
example::EchoResponse rsp;
stub.Echo(&cntl, &req, &rsp, nullptr); // 同步调用
if (cntl.Failed() == true) {
std::cout << "Rpc调用失败:" << cntl.ErrorText() << std::endl;
return -1;
}
std::cout << "同步调用结束!\n";
std::cout << "收到响应: " << rsp.message() << std::endl;
return 0;
}
异步调用:异步调用是指客户端注册一个响应处理回调函数, 当调用一个 RPC 接口时立即返回,不会阻塞等待响应, 当 server 端返回响应时会调用传入的回调函数处理响应。
#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"
// 这个回调函数用于异步调用对象使用
void callback(brpc::Controller* cntl, ::example::EchoResponse* response)
{
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<example::EchoResponse> resp_guard(response);
if (cntl->Failed() == true)
{
std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;
return;
}
std::cout << "收到响应: " << response->message() << std::endl;
// delete cntl;
// delete response;
}
int main(int argc, char *argv[])
{
// 1. 构造Channel信道,连接服务器
brpc::ChannelOptions options;
options.connect_timeout_ms = -1; // 连接等待超时时间,-1表示一直等待
options.timeout_ms = -1; // rpc请求等待超时时间,-1表示一直等待
options.max_retry = 3; // 请求重试次数
options.protocol = "baidu_std"; // 序列化协议,默认使用baidu_std
brpc::Channel channel;
int ret = channel.Init("127.0.0.1:8080", &options);
if (ret == -1)
{
std::cout << "初始化信道失败!\n";
return -1;
}
// 2. 构造EchoService_Stub对象,用于进行rpc调用
example::EchoService_Stub stub(&channel);
// 3. 进行Rpc调用
example::EchoRequest req;
req.set_message("你好~brpc~!");
// 防止异步调用时 cnt rsp 出作用域被销毁,要在堆上创建对象
brpc::Controller *cntl = new brpc::Controller();
example::EchoResponse *rsp = new example::EchoResponse();
auto clusure = google::protobuf::NewCallback(callback, cntl, rsp); // 异步调用对象
stub.Echo(cntl, &req, rsp, clusure); // 异步调用
std::cout << "异步调用结束!\n";
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
原文地址:https://blog.csdn.net/Crocodile1006/article/details/142533487
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!