自学内容网 自学内容网

Robot Operating System——消息的序列化和反序列化

在ROS 2中,提供了一种叫做SerializedMessage的消息。我们可以将C类型或者C++类型变量序列化后发送,然后在接收端再反序列化还原。这种方案在有别于我们常见的通过Json串进行序列化和反序列化。它在底层会使用类型的内存模型去做数据拼装,而不是转成某种可视化的字符串,所以它的效率更高些。

本文我们将通过案例看看消息序列化和反序列化的使用。

在消息发布者中序列化

我们借用demo_nodes_cpp的案例talker_serialized_message(demo_nodes_cpp/src/topics/talker_serialized_message.cpp)来讲解。

它是一个叫做serialized_message_talker的消息发布者类

class SerializedMessageTalker : public rclcpp::Node
{
public:
  DEMO_NODES_CPP_PUBLIC
  explicit SerializedMessageTalker(const rclcpp::NodeOptions & options)
  : Node("serialized_message_talker", options),
    serialized_msg_(0u)
  {
  ……
    rclcpp::QoS qos(rclcpp::KeepLast(7));
    pub_ = this->create_publisher<std_msgs::msg::String>("chatter", qos);

    // Use a timer to schedule periodic message publishing.
    timer_ = this->create_wall_timer(1s, publish_message);
  }

它会通过定时器timer_每隔一段时间,通过发布者pub_将序列化后的消息serialized_msg_发送到订阅端。需要注意的是,此时发布者pub_的模板参数是std_msgs::msg::String,而不是SerializedMessage 。这是因为本例中SerializedMessage消息实则是对std_msgs::msg::String类型内容进行了打包。

private:
  size_t count_ = 1;
  rclcpp::SerializedMessage serialized_msg_;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
  rclcpp::TimerBase::SharedPtr timer_;
};

我们主要关注定时发送消息的回调函数的实现

    // Create a function for when messages are to be sent.
    auto publish_message =
      [this]() -> void
      {
        // In this example we send a std_msgs/String as serialized data.
        // This is the manual CDR serialization of a string message with the content of
        // Hello World: <count_> equivalent to talker example.
        // The serialized data is composed of a 8 Byte header
        // plus the length of the actual message payload.
        // If we were to compose this serialized message by hand, we would execute the following:
        // rcutils_snprintf(
        //   serialized_msg_.buffer, serialized_msg_.buffer_length, "%c%c%c%c%c%c%c%c%s %zu",
        //   0x00, 0x01, 0x00, 0x00, 0x0f, 0x00, 0x00, 0x00, "Hello World:", count_++);

        // In order to ease things up, we call the rmw_serialize function,
        // which can do the above conversion for us.
        // For this, we initially fill up a std_msgs/String message and fill up its content

这段说明讲的很明白,回调函数会将std_msgs::msg::String消息序列化成二进制内存块。被序列化后的内存是原来对象长度加上前置的8个字节。我们可以通过注释中的例子手工构造这段内存,但是后面的代码提供了更友好的方法。

        auto string_msg = std::make_shared<std_msgs::msg::String>();
        string_msg->data = "Hello World:" + std::to_string(count_++);

        // We know the size of the data to be sent, and thus can pre-allocate the
        // necessary memory to hold all the data.
        // This is specifically interesting to do here, because this means
        // no dynamic memory allocation has to be done down the stack.
        // If we don't allocate enough memory, the serialized message will be
        // dynamically allocated before sending it to the wire.
        auto message_header_length = 8u;
        auto message_payload_length = static_cast<size_t>(string_msg->data.size());
        serialized_msg_.reserve(message_header_length + message_payload_length);

        static rclcpp::Serialization<std_msgs::msg::String> serializer;
        serializer.serialize_message(string_msg.get(), &serialized_msg_);

这段代码拼接了“Hello World!”和一个自增变量。这构成我们需要传递给订阅者的消息主体。

中间一段代码是给serialized_msg_分配空间。因为已经提前知道对象大小,以及因为序列化要添加的8个字节头,所以很容易算出要分配的空间大小。但是需要说明的是,这个空间此时不用分配也可以。因为后续序列化方法serialize_message会自动给serialized_msg_分配足够的内存,我们将会在《》中分析其实现。

最后一段代码构建了一个局部静态的序列化器serializer。它的模板参数就是我们需要序列化的对象类型std_msgs::msg::String。然后给它的序列化方法serialize_message传递待序列化的对象的地址空间以及序列化消息对象serialized_msg_的地址。这样我们的消息机会被打包到serialized_msg_中了。

最后我们将序列化的消息发布出去。

        // For demonstration we print the ROS 2 message format
        printf("ROS message:\n");
        printf("%s\n", string_msg->data.c_str());
        // And after the corresponding binary representation
        printf("serialized message:\n");
        for (size_t i = 0; i < serialized_msg_.size(); ++i) {
          printf("%02x ", serialized_msg_.get_rcl_serialized_message().buffer[i]);
        }
        printf("\n");

        pub_->publish(serialized_msg_);
      };

完整代码

// Copyright 2017 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <iostream>
#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "rclcpp_components/register_node_macro.hpp"

#include "std_msgs/msg/string.hpp"

#include "rclcpp/serialization.hpp"

#include "demo_nodes_cpp/visibility_control.h"

using namespace std::chrono_literals;

namespace demo_nodes_cpp
{

class SerializedMessageTalker : public rclcpp::Node
{
public:
  DEMO_NODES_CPP_PUBLIC
  explicit SerializedMessageTalker(const rclcpp::NodeOptions & options)
  : Node("serialized_message_talker", options),
    serialized_msg_(0u)
  {
    // Create a function for when messages are to be sent.
    auto publish_message =
      [this]() -> void
      {
        // In this example we send a std_msgs/String as serialized data.
        // This is the manual CDR serialization of a string message with the content of
        // Hello World: <count_> equivalent to talker example.
        // The serialized data is composed of a 8 Byte header
        // plus the length of the actual message payload.
        // If we were to compose this serialized message by hand, we would execute the following:
        // rcutils_snprintf(
        //   serialized_msg_.buffer, serialized_msg_.buffer_length, "%c%c%c%c%c%c%c%c%s %zu",
        //   0x00, 0x01, 0x00, 0x00, 0x0f, 0x00, 0x00, 0x00, "Hello World:", count_++);

        // In order to ease things up, we call the rmw_serialize function,
        // which can do the above conversion for us.
        // For this, we initially fill up a std_msgs/String message and fill up its content
        auto string_msg = std::make_shared<std_msgs::msg::String>();
        string_msg->data = "Hello World:" + std::to_string(count_++);

        // We know the size of the data to be sent, and thus can pre-allocate the
        // necessary memory to hold all the data.
        // This is specifically interesting to do here, because this means
        // no dynamic memory allocation has to be done down the stack.
        // If we don't allocate enough memory, the serialized message will be
        // dynamically allocated before sending it to the wire.
        auto message_header_length = 8u;
        auto message_payload_length = static_cast<size_t>(string_msg->data.size());
        serialized_msg_.reserve(message_header_length + message_payload_length);

        static rclcpp::Serialization<std_msgs::msg::String> serializer;
        serializer.serialize_message(string_msg.get(), &serialized_msg_);

        // For demonstration we print the ROS 2 message format
        printf("ROS message:\n");
        printf("%s\n", string_msg->data.c_str());
        // And after the corresponding binary representation
        printf("serialized message:\n");
        for (size_t i = 0; i < serialized_msg_.size(); ++i) {
          printf("%02x ", serialized_msg_.get_rcl_serialized_message().buffer[i]);
        }
        printf("\n");

        pub_->publish(serialized_msg_);
      };

    rclcpp::QoS qos(rclcpp::KeepLast(7));
    pub_ = this->create_publisher<std_msgs::msg::String>("chatter", qos);

    // Use a timer to schedule periodic message publishing.
    timer_ = this->create_wall_timer(1s, publish_message);
  }

private:
  size_t count_ = 1;
  rclcpp::SerializedMessage serialized_msg_;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
  rclcpp::TimerBase::SharedPtr timer_;
};

}  // namespace demo_nodes_cpp

RCLCPP_COMPONENTS_REGISTER_NODE(demo_nodes_cpp::SerializedMessageTalker)

在订阅者中反序列化

我们还是以ROS 2的例子listener_serialized_message(demo_nodes_cpp/src/topics/listener_serialized_message.cpp)来讲解。
先创建一个消息接受者类

// Create a Listener class that subclasses the generic rclcpp::Node base class.
// The main function below will instantiate the class as a ROS node.
class SerializedMessageListener : public rclcpp::Node
{
public:
  DEMO_NODES_CPP_PUBLIC
  explicit SerializedMessageListener(const rclcpp::NodeOptions & options)
  : Node("serialized_message_listener", options)
  {

然后通过订阅者sub_来订阅消息

    // Create a subscription to the topic which can be matched with one or more compatible ROS
    // publishers.
    // Note that not all publishers on the same topic with the same type will be compatible:
    // they must have compatible Quality of Service policies.
    sub_ = create_subscription<std_msgs::msg::String>("chatter", 10, callback);
  }

private:
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
};

需要注意的是订阅者sub_的模板参数是std_msgs::msg::String,而不是SerializedMessage 。只是因为SerializedMessage 是对std_msgs::msg::String类型消息进行了打包。
但是有趣的是,订阅的回调函数的入参是const std::shared_ptr<rclcpp::SerializedMessage>类型。


    auto callback =
      [](const std::shared_ptr<rclcpp::SerializedMessage> msg) -> void
      {
        // Print the serialized data message in HEX representation
        // This output corresponds to what you would see in e.g. Wireshark
        // when tracing the RTPS packets.
        std::cout << "I heard data of length: " << msg->size() << std::endl;
        for (size_t i = 0; i < msg->size(); ++i) {
          printf("%02x ", msg->get_rcl_serialized_message().buffer[i]);
        }
        printf("\n");

        // In order to deserialize the message we have to manually create a ROS 2
        // message in which we want to convert the serialized data.
        using MessageT = std_msgs::msg::String;
        MessageT string_msg;
        auto serializer = rclcpp::Serialization<MessageT>();
        serializer.deserialize_message(msg.get(), &string_msg);
        // Finally print the ROS 2 message data
        std::cout << "serialized data after deserialization: " << string_msg.data << std::endl;
      };

这次生成一个局部非静态的序列化器serializer ,使用它的deserialize_message方法将SerializedMessage消息反序列化为std_msgs::msg::String类型。

完整代码

// Copyright 2017 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cstdio>
#include <iostream>
#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "rclcpp/serialization.hpp"
#include "rclcpp_components/register_node_macro.hpp"

#include "std_msgs/msg/string.hpp"

#include "demo_nodes_cpp/visibility_control.h"

namespace demo_nodes_cpp
{
// Create a Listener class that subclasses the generic rclcpp::Node base class.
// The main function below will instantiate the class as a ROS node.
class SerializedMessageListener : public rclcpp::Node
{
public:
  DEMO_NODES_CPP_PUBLIC
  explicit SerializedMessageListener(const rclcpp::NodeOptions & options)
  : Node("serialized_message_listener", options)
  {
    setvbuf(stdout, NULL, _IONBF, BUFSIZ);
    // We create a callback to a rmw_serialized_message_t here. This will pass a serialized
    // message to the callback. We can then further deserialize it and convert it into
    // a ros2 compliant message.
    auto callback =
      [](const std::shared_ptr<rclcpp::SerializedMessage> msg) -> void
      {
        // Print the serialized data message in HEX representation
        // This output corresponds to what you would see in e.g. Wireshark
        // when tracing the RTPS packets.
        std::cout << "I heard data of length: " << msg->size() << std::endl;
        for (size_t i = 0; i < msg->size(); ++i) {
          printf("%02x ", msg->get_rcl_serialized_message().buffer[i]);
        }
        printf("\n");

        // In order to deserialize the message we have to manually create a ROS 2
        // message in which we want to convert the serialized data.
        using MessageT = std_msgs::msg::String;
        MessageT string_msg;
        auto serializer = rclcpp::Serialization<MessageT>();
        serializer.deserialize_message(msg.get(), &string_msg);
        // Finally print the ROS 2 message data
        std::cout << "serialized data after deserialization: " << string_msg.data << std::endl;
      };
    // Create a subscription to the topic which can be matched with one or more compatible ROS
    // publishers.
    // Note that not all publishers on the same topic with the same type will be compatible:
    // they must have compatible Quality of Service policies.
    sub_ = create_subscription<std_msgs::msg::String>("chatter", 10, callback);
  }

private:
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
};

}  // namespace demo_nodes_cpp

RCLCPP_COMPONENTS_REGISTER_NODE(demo_nodes_cpp::SerializedMessageListener)

执行效果

发布者(序列化)

在这里插入图片描述

订阅者(反序列化)

在这里插入图片描述


原文地址:https://blog.csdn.net/breaksoftware/article/details/140521459

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!