自学内容网 自学内容网

CyberRT通信介绍与基于Reader、Writer的通信实践(apollo9.0)

目录

数据通信场景

CyberRT中的通信方式 

​编辑

通信模式

话题通信

服务通信

参数通信

protobuf

protobuf简介

 protobuf文件编写

topic通信实验

实验环境

实验准备

代码编写

定义消息格式

发送消息

接收消息

定义编译规则

程序编译

运行程序


数据通信场景

数据间通信一般分三种情况,分别是

  • 进程内通信
  • 进程间通信
  • 跨主机通信

针对三种不同的通信场景也有不同的通信方式,如

  • 进程内通信方式:对象指针、函数
  • 进程间通信方式:共享内存、管道、消息队列等
  • 跨主机进程间通信方式:RTPS、GRPC等

CyberRT中的通信方式 

以下是CyberRT中针对不同通信场景所使用的通信方式

但是一般情况下我们是不知道通信对方是与我们同一个进程还是不同进程甚至是否一个主机的,而CyberRT对此做出以下判断

  • 判断对端ip与本进程的ip是否相同,如果不同则采用跨主机通信
  • 判断对端进程id与本进程id是否相同,如果不同则采用进程间通信
  • 如果前两者都相同,则采用进程内通信方式

通信模式

CyberRT中的数据通信模式主要有三种:

  • Writer/Reader:发布订阅者模式
  • Service/Client:服务客户通信模式
  • Parameter Server-Client:参数服务客户通信模式

话题通信

模式

以发布订阅的方式实现不同节点之间数据交互的通信模式。

如图所示

  • Listener-Talker通信首先创建了两个Node,分别是Talker Node和 Listener Node。
  • 每个Node实例化Writer类和Reader类对Channel进行消息的读写。
  • Writer和Reader通过Topic连接,对同一块共享内存(Channel)进行读写处理。
  • Talker Node 为了实现其“诉说”的功能,实例化Writer,通过Writer来对Channel进行消息写操作。
  • Listener Node为了实现其“聆听”功能,实例化reader类,通过Reader来对channel进行读操作。

场景

话题通信方式适合于持续性通信的应用场景,比如雷达信号,摄像头图像信息这类数据的传输。

使用:

Listener-Talker通信一方主动送消息,一方被动接收。

我们想要一直获取车的速度,该需求不需要向发送方返回什么消息,也不需要发送方对消息进行进一步处理。所以我们选择了Listener-Talker通信方式实现该功能。

数据定义:

话题通信中用的的数据格式的定义car message 定义在car.proto中。

服务通信

模式

以请求响应的方式实现不同节点之间数据交互的通信模式。

如图所示,Server-Client通信可以在客户端发出消息请求时,服务端才进⾏请求回应,并将客户端所需的数据返回给客户端。

场景

我们想要获得⼩⻋的详细信息,⽐如⻋牌这些,但是⼜不需要⼀直获得该信息,想要在需要知道这些信息的时候请求⼀下就好,于是考虑⽤Server- Client通信实现该功能。

使用:

该通信模式适合临时的消息传输,适⽤于不需要持续性发送数据的场景。

数据定义:

其传输的数据定义依然在对应的proto⽂件中。

参数通信

模式

以共享的方式实现不同节点之间数据交互的通信模式。

参数服务器是基于服务实现的,包含客户端和服务器端,服务端节点可以存储数据,客户端节点可以访问服务端节点操作数据,这个过程虽然基于请求响应的,但是无需自己实现请求与响应,此过程已经被封装,调用者只需要通过比较简单友好的API就可以实现参数操作。

场景

自动驾驶场景中有一些参数比如该车的最高限速、最多乘客以及是否自动驾驶等需要被各个模块使用数据,比如是否自动驾驶这个参数可能同时影响这很多模块,也可能被很多模块运行时所更改。

这些数据如何实现在不同模块之间的共享呢?

使用:

类似于“全局变量”的方式来存储这些参数,并定义一些自定义参数来进行使用。

数据定义:

Cyber中设计了全局参数服务器来实现这个功能,其通信基于RTPS协议。该通信方式服务端和客户端都可以设置参数和更改参数。

protobuf

protobuf简介

Protobuf 是 Google 公司开发的一种跨语言和平台的序列化数据结构的方式,是一个灵活的、高效的用于序列化数据的协议,与 XML 和 JSON 格式相比,Protobuf 更小、更快、更便捷。

Protobuf 是跨语言的,并且自带一个编译器( protoc ),只需要用protoc进行编译,就可以编译成 Java、Python、C++、C#、Go 等多种语言代码,然后可以直接使用,不需要再写其它代码,自带有解析的代码。只需要将要被序列化的结构化数据定义一次(在 .proto 文件定义),便可以使用特别生成的源代码(使用protobuf提供的生成工具)轻松的使用不同的数据流完成对结构数据的读写操作。甚至可以更新 .proto 文件中对数据结构的定义而不会破坏依赖旧格式编译出来的程序。其优点如下:

  • 性能效率高序列化后字节占用空间比 XML 少3-10倍,序列化的时间效率比 XML 快20-100倍。

  • 使用便捷便捷:将对结构化数据的操作封装成一个类,便于使用。

  • 兼容性高:通信两方使用同一数据协议,当有一方修改了数据结构,不会影响另一方的使用。

  • 跨语言:支持 Java,C++,Python、Go、Ruby 等多种语言。

 protobuf文件编写示例

Protobuf有几个部分构成:

  • syntax :表示使用Protobuf的版本,目前Protobuf支持proto3,但在Apollo中使用的是proto2;

  • package: 表示该文件的路径

  • message:表示一种数据结构,message后面跟的是数据结构名字,括号里的字段定义格式为:字段规则 数据类型 字段名称 字段编号。

字段规则主要有三种

  • required:调用时必须提供该字段的值,否则该消息被视为“未初始化”,官方不建议使用,当把字段规则改为其他规则会存在兼容性问题。

  • optional:该字段的值可以设置也可以不设置,会根据数据类型生成一个默认的值

  • repeated类似于动态数组,可以存储多个同类型的数据

如下示例

# examples.proto
syntax = "proto2";
package apollo.cyber.examples.proto;
message SamplesTest1 {
  optional string class_name = 1;
  optional string case_name = 2;
};
message Chatter {
  optional uint64 timestamp = 1;
  optional uint64 lidar_timestamp = 2;
  optional uint64 seq = 3;
  optional bytes content = 4;
};
message Driver {
  optional string content = 1;
  optional uint64 msg_id = 2;
  optional uint64 timestamp = 3;
};

topic通信实验

实验环境

apollo9.0,安装见官网说明,安装完成后进入容器即可

apollo.baidu.com/community/Apollo-Homepage-Document?doc=BYFxAcGcC4HpYIbgPYBtXIHQCMEEsATAV0wGNkBbWA5UyRFdZWVBEAU0hFgoIH0adPgCY%2BADwCiAVnEBBCeIAcATnETFcgMxKZkgGxKAwkoDsa3YoAi45WdGSLxsYt0SzY%2BXICMa98oAMSgYALF7%2B2NhemsLBJsrCYZqKwors7AikBIp6miYmpFJSXpigFKhAA

实验准备

创建代码结构目录

buildtool create --template component communication
touch /apollo_workspace/communication/listener.cc
touch /apollo_workspace/communication/talker.cc

 最终的实验目录结构如下

communication
 ├── BUILD
 ├── communication.cc
 ├── communication.h
 ├── conf
 │   ├── communication.conf
 │   └── communication.pb.txt
 ├── cyberfile.xml
 ├── dag
 │   └── communication.dag
 ├── launch
 │   └── communication.launch
 ├── listener.cc <必须有>
 ├── proto <必须有>
 │   ├── BUILD <必须有>
 │   └── communication.proto <必须有>
 └── talker.cc <必须有>

代码编写

定义消息格式

编写proto/communication.proto文件

syntax = "proto2";
 
package apollo.communication.proto;
 
 //定义一个车的消息,车的型号,车主,车的车牌号,已跑公里数,车速
 message Car{
     optional string plate = 1;
     optional string type = 2;
     optional string owner = 3;
     optional uint64 kilometers = 4;
     optional uint64 speed = 5;
 };

发送消息

编写发送方 talker 代码,talker.cc 代码如下

#include "communication/proto/communication.pb.h"
 #include "cyber/cyber.h"
 #include "cyber/time/rate.h"
 
 //car数据定义的引用,可以看出其定义来源于一个proto
 using apollo::communication::proto::Car;
 
 int main(int argc, char *argv[]) {
   // 初始化一个cyber框架
   apollo::cyber::Init(argv[0]);
   // 创建talker节点
   auto talker_node = apollo::cyber::CreateNode("talker");
   // 从节点创建一个Topic,来实现对车速的查看
   auto talker = talker_node->CreateWriter<Car>("car_speed");
   AINFO << "I'll start telling you the current speed of the car.";
 
   //设置初始速度为0,然后速度每秒增加5km/h
   uint64_t speed = 0;
   while (apollo::cyber::OK()) {
       auto msg = std::make_shared<Car>();
       msg->set_speed(speed);
       //假设车速持续增加
       speed += 5;
       talker->Write(msg);
       sleep(1);
   }
   return 0;
 }

接收消息

编写接受方 listener 代码,listener.cc 代码如下

#include "communication/proto/communication.pb.h"
 #include "cyber/cyber.h"
 
 using apollo::communication::proto::Car;
 
 //接收到消息后的响应函数
 void message_callback(
         const std::shared_ptr<Car>& msg) {
     AINFO << "now speed is: " << msg->speed();
 }
 
 int main(int argc, char* argv[]) {
     //初始化cyber框架
     apollo::cyber::Init(argv[0]);
     //创建监听节点
     auto listener_node = apollo::cyber::CreateNode("listener");
     //创建监听响应进行消息读取
     auto listener = listener_node->CreateReader<Car>(
             "car_speed", message_callback);
     apollo::cyber::WaitForShutdown();
     return 0;
 }

定义编译规则

修改 BUILD 文件,将新写的代码加入到编译中,communication/BUILD文件修改如下

load("//tools:apollo_package.bzl", "apollo_cc_library", "apollo_cc_binary", "apollo_package", "apollo_component")
load("//tools:cpplint.bzl", "cpplint")
 
package(default_visibility = ["//visibility:public"])
 
apollo_cc_binary(
    name = "talker",
    srcs = ["talker.cc"],
    deps = [
        "//cyber",
        "//communication/proto:communication_proto",
    ],
    linkstatic = True,
)
apollo_cc_binary(
    name = "listener",
    srcs = ["listener.cc"],
    deps = [
        "//cyber",
        "//communication/proto:communication_proto",
    ],
    linkstatic = True,
)
 
apollo_package()
 
cpplint()

程序编译

//回到 /apollo_workspace目录下编译
cd /apollo_workspace
buildtool build -p communication

 编译成功显示如下:

运行程序

打开一个终端,运行talker程序

// 设置将输出结果到控制台
export GLOG_alsologtostderr=1
// 编译产生的可执行文件在 /opt/apollo/neo/bin/
cd /opt/apollo/neo/bin/
// 执行talker
./talker

 重新打开一个终端,运行listener程序

// 设置将输出结果到控制台
export GLOG_alsologtostderr=1
// 编译产生的可执行文件在 /opt/apollo/neo/bin/
cd /opt/apollo/neo/bin/
// 执行listener
./listener

运行结果如下图所示,左侧是talker发送消息,右侧是listener接收消息

参考

apollo.baidu.com/community/Apollo-Homepage-Document?doc=BYFxAcGcC4HpYIbgPYBtXIHQCMEEsATAV0wGNkBbWA5UyRFdZWVBEAU0hFgoIH0adPgCY%2BADwCiAVnEBBCeIAcATnETFcgMxKZkgGxKAwkoDsa3YoAi45WdGSLxsYt0SzY%2BXICMa98oAMSgYALF7%2B2NhemsLBJsrCYZqKwors7AikBIp6miYmpFJSXpigFKhAA 


原文地址:https://blog.csdn.net/qq_58158950/article/details/143064434

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!