锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

ROS机器人操作系统底层原理及代码剖析

时间:2023-08-04 16:37:02 sub4p插头连接器

0 目的

本文介绍ROS机器人操作系统(Robot Operating System)实现原理,从底层分析ROS如何实现代码。

1 序列化

通信内容(即消息)message)序列化是通信的基础,所以我们先研究序列化。虽然作者长期从事机器人学习和研发,但他正在研究中ROS这是我这辈子第一次听到序列化这个词。所以可想而知,很多人看到把一个消息序列化这样的描述时是多么的困惑。但事实上,序列化是一个常见的概念。虽然你不知道,但你一定接触过它。先介绍一些序列化的常识,再解释ROS里面的序列化是怎么做的?

1.1 什么是序列化?

“序列化”(Serialization )这意味着将对象转化为字节流。这里说的对象可以理解为“面向对象”里的那个对象,具体的就是存储在内存中的对象数据。相反,过程是反序列化(Deserialization )。虽然挂着机器人的羊头,但后面的介绍都是计算机知识,与机器人无关。序列化是一个纯粹的计算机概念。英语序列化Serialize把一件事变成一系列连续的东西。对图像的描述,数据对象是一团面,序列化就是把面团拉成面条,反序列化就把面条捏回面团。另一个形象的类比是,当我们说话或打电话时,一个人的思想转化为一个一维的声音,然后在另一个人的脑海中转化为一个结构化的思想,这也是一个序列。

面对序列化,很多人心中可能会有很多疑问。
首先,为什么要序列化?或者更具体地说,既然对象的信息最初以字节的形式存储在内存中,为什么要将一些字节数据转换为另一种形式、一维和连续的字节数据呢?假如我们的程序在内存中存储了一个数字,比如25。如何将这个数字传递给其他程序节点或永久存储?直接传输25个数字很简单(字节表示0X当然,19最终会变成二进制,表示11001以高低电平传输存储),或者直接将这个数字(字节表示)写入硬盘。所以,对于本来就是连续的、一维的、一连串的数据(例如字符串),序列化并不需要做太多东西,其本质是就是由内存向其它地方拷贝数据而已。因此,如果你在序列化库中看到它,memcpy函数不用觉得奇怪,因为你知道序列化最底层不过就是在操作内存数据而已(还有些库使用了流的ostream.rdbuf()->sputn函数)。然而,实际程序操作的对象很少如此简单。大多数时候,我们面临不同类型的数据(int、double、string)复杂的数据结构(例如vector、list),它们很可能在内存中分散,而不是连续存储。比如ROS许多新闻包含向量。还有各种各样的指针和引用。此外,如果数据应该在不同架构的计算机上运行,并且由不同编程语言编写的节点程序之间传输,那么问题就更复杂了,它们的字节顺序endianness规定有可能不一样,基本数据类型(比如int)有的长度不同(有的长度不同(有的长度不同)int有四个字节,有八个字节)。这些都不能通过简单地和粘贴原始数据。此时需要序列化和反序列化。因此,在程序之间需要通信时(ROS就这样),或者当你想保存程序的中间运算结果时,序列化就会出现。此外,序列化在一定程度上也起着统一标准的作用。

我们称序列化的东西为object(对象),它可以是任何数据结构或对象:结构体、数组、类实例等。序列化后得到的东西叫做archive,既可以是人类可读的文本形式,也可以是二进制形式。前者比如JSON和XML,这两种是网络应用中最常用的序列格式,可以通过记事本打开阅读;后者是原始的二进制文件,如后缀名称bin人类无法直接阅读一堆0101或00的文件XC9D23E72的。
序列化算是一个常用的功能,所以大多数编程语言(如C 、Python、Java等)会附带用于序列化的库,不需要再造轮子。C 例如,尽管标准STL库不提供序列化功能,但第三方库Boost提供了 [ 2 ] ^{[2]} [2],谷歌的protobuf也是序列化库,Fast-CDR,而且不太出名Cereal,Java自带序列化函数,python第三方第三方pickle模块实现。
简而言之,序列化并不神秘。用户可以查看这些开源的序列化库代码,或者写一个小程序试简单数据的序列化,比如这个例子,或者这个,这将有助于更好地理解ROS中的实现。

1.2 ROS实现序列化

理解序列化,然后回到ROS。我们发现,ROS不使用第三方序列,而不是使用第三方序列化工具。roscpp_core项目下的roscpp_serialization见下图。此功能涉及的代码不多。

为什么ROS不使用现成的序列化工具或图书馆?ROS出生时(2007年),一些序列化库可能不存在(protobuf出生于2008年),更有可能是ROS创作者认为当时没有合适的工具。

1.2.1 serialization.h

核心函数都在serialization.h简而言之,使用C语言标准库memcpy函数将信息复制到流中。让我们来看看具体的实现。
序列化功能的特点是处理多种数据类型,对每种特定类型实现相应的序列化函数。尽量减少代码量,ROS使用模板的概念,所以代码中有很多template
向前梳理,先看Stream这个结构。在C 函数也可以在结构体中定义。Stream翻译成流,流是计算机中的抽象概念提到过字节流。这是什么意思?当需要传输数据时,我们可以将数据想象成连续排列在传送带上的物体,它们是流动的。可以想象磁带或图灵机中的连续纸带。读写文件,使用串口,网络Socket流量常用于通信等领域。例如,常用的输入输出流:cout<<"helllo"; 流动的概念也在演变,因为它被广泛使用。如果你想知道更多,你可以在这里看到。

struct Stream { 
        
  // Returns a pointer to the current position of the stream
  inline uint8_t* getData() { 
         return data_; }
  // Advances the stream, checking bounds, and returns a pointer to the position before it was advanced.
  // \throws StreamOverrunException if len would take this stream past the end of its buffer
  ROS_FORCE_INLINE uint8_t* advance(uint32_t len)
  { 
        
    uint8_t* old_data = data_;
    data_ += len;
    if (data_ > end_)
    { 
        
      // Throwing directly here causes a significant speed hit due to the extra code generated for the throw statement
      throwStreamOverrun();
    }
    return old_data;
  }
  // Returns the amount of space left in the stream
  inline uint32_t getLength() { 
         return static_cast<uint32_t>(end_ - data_); }
  
protected:
  Stream(uint8_t* _data, uint32_t _count) : data_(_data), end_(_data + _count) { 
        }

private:
  uint8_t* data_;
  uint8_t* end_;
};

  注释表明Stream是个基类,输入输出流IStreamOStream都继承自它。Stream的成员变量data_是个指针,指向序列化的字节流开始的位置,它的类型是uint8_t。在Ubuntu系统中,uint8_t的定义是typedef unsigned char uint8_t;,所以uint8_t就是一个字节,可以用size_of()函数检验。data_指向的空间就是保存字节流的。
  输出流类OStream用来序列化一个对象,它引用了serialize函数,如下。

struct OStream : public Stream
{ 
        
  static const StreamType stream_type = stream_types::Output;
  OStream(uint8_t* data, uint32_t count) : Stream(data, count) { 
        }
  /* Serialize an item to this output stream*/
  template<typename T>
  ROS_FORCE_INLINE void next(const T& t)
  { 
        
    serialize(*this, t);
  }
  template<typename T>
  ROS_FORCE_INLINE OStream& operator<<(const T& t)
  { 
        
    serialize(*this, t);
    return *this;
  }
};

  输入流类IStream用来反序列化一个字节流,它引用了deserialize函数,如下。

struct ROSCPP_SERIALIZATION_DECL IStream : public Stream
{ 
        
  static const StreamType stream_type = stream_types::Input;
  IStream(uint8_t* data, uint32_t count) : Stream(data, count) { 
        }
  /* Deserialize an item from this input stream */
  template<typename T>
  ROS_FORCE_INLINE void next(T& t)
  { 
        
    deserialize(*this, t);
  }
  template<typename T>
  ROS_FORCE_INLINE IStream& operator>>(T& t)
  { 
        
    deserialize(*this, t);
    return *this;
  }
};

  自然,serialize函数和deserialize函数就是改变数据形式的地方,它们的定义在比较靠前的地方。它们都接收两个模板,都是内联函数,然后里面没什么东西,只是又调用了Serializer类的成员函数writeread。所以,serializedeserialize函数就是个二道贩子。

// Serialize an object. Stream here should normally be a ros::serialization::OStream
template<typename T, typename Stream>
inline void serialize(Stream& stream, const T& t)
{ 
        
  Serializer<T>::write(stream, t);
}
// Deserialize an object. Stream here should normally be a ros::serialization::IStream
template<typename T, typename Stream>
inline void deserialize(Stream& stream, T& t)
{ 
        
  Serializer<T>::read(stream, t);
}

  所以,我们来分析Serializer类,如下。我们发现,writeread函数又调用了类型里的serialize函数和deserialize函数。头别晕,这里的serializedeserialize函数跟上面的同名函数不是一回事。注释中说:“Specializing the Serializer class is the only thing you need to do to get the ROS serialization system to work with a type”(要想让ROS的序列化功能适用于其它的某个类型,你唯一需要做的就是特化这个Serializer类)。这就涉及到的另一个知识点——模板特化(template specialization)。

template<typename T> struct Serializer
{ 
        
  // Write an object to the stream. Normally the stream passed in here will be a ros::serialization::OStream
  template<typename Stream>
  inline static void write(Stream& stream, typename boost::call_traits<T>::param_type t)
  { 
        
    t.serialize(stream.getData(), 0);
  }
   // Read an object from the stream. Normally the stream passed in here will be a ros::serialization::IStream
  template<typename Stream>
  inline static void read(Stream& stream, typename boost::call_traits<T>::reference t)
  { 
        
    t.deserialize(stream.getData());
  }
  // Determine the serialized length of an object.
  inline static uint32_t serializedLength(typename boost::call_traits<T>::param_type t)
  { 
        
    return t.serializationLength();
  }
};

  接着又定义了一个带参数的宏函数ROS_CREATE_SIMPLE_SERIALIZER(Type),然后把这个宏作用到了ROS中的10种基本数据类型,分别是:uint8_t, int8_t, uint16_t, int16_t, uint32_t, int32_t, uint64_t, int64_t, float, double。说明这10种数据类型的处理方式都是类似的。看到这里大家应该明白了,writeread函数都使用了memcpy函数进行数据的移动。注意宏定义中的template<>语句,这正是模板特化的标志,关键词template后面跟一对尖括号。关于模板特化可以看这里。

#define ROS_CREATE_SIMPLE_SERIALIZER(Type) \ template<> struct Serializer<Type> \ { 
           \ template<typename Stream> inline static void write(Stream& stream, const Type v) \ { 
           \ memcpy(stream.advance(sizeof(v)), &v, sizeof(v) ); \ } \ template<typename Stream> inline static void read(Stream& stream, Type& v) \ { 
           \ memcpy(&v, stream.advance(sizeof(v)), sizeof(v) ); \ } \ inline static uint32_t serializedLength(const Type&) \ { 
           \ return sizeof(Type); \ } \ };
ROS_CREATE_SIMPLE_SERIALIZER(uint8_t)
ROS_CREATE_SIMPLE_SERIALIZER(int8_t)
ROS_CREATE_SIMPLE_SERIALIZER(uint16_t)
ROS_CREATE_SIMPLE_SERIALIZER(int16_t)
ROS_CREATE_SIMPLE_SERIALIZER(uint32_t)
ROS_CREATE_SIMPLE_SERIALIZER(int32_t)
ROS_CREATE_SIMPLE_SERIALIZER(uint64_t)
ROS_CREATE_SIMPLE_SERIALIZER(int64_t)
ROS_CREATE_SIMPLE_SERIALIZER(float)
ROS_CREATE_SIMPLE_SERIALIZER(double)

  对于其它类型的数据,例如boolstd::stringstd::vectorros::Timeros::Durationboost::array等等,它们各自的处理方式有细微的不同,所以不再用上面的宏函数,而是用模板特化的方式每种单独定义,这也是为什么serialization.h这个文件这么冗长。
  对于int、double这种单个元素的数据,直接用上面特化的Serializer类中的memcpy函数实现序列化。对于vector、array这种多个元素的数据类型怎么办呢?方法是分成几种情况,对于固定长度简单类型的(fixed-size simple types),还是用各自特化的Serializer类中的memcpy函数实现,没啥太大区别。对于固定但是类型不简单的(fixed-size non-simple types)或者既不固定也不简单的(non-fixed-size, non-simple types)或者固定但是不简单的(fixed-size, non-simple types),用for循环遍历,一个元素一个元素的单独处理。那怎么判断一个数据是不是固定是不是简单呢?这是在roscpp_traits文件夹中的message_traits.h完成的。其中采用了萃取Type Traits,这是相对高级一点的编程技巧了,笔者也不太懂。
  对序列化的介绍暂时就到这里了,有一些细节还没讲,等笔者看懂了再补。

2 消息订阅发布

2.1 ROS的本质

  如果问ROS的本质是什么,或者用一句话概括ROS的核心功能。那么,笔者认为ROS就是个通信库,让不同的程序节点能够相互对话。很多文章和书籍在介绍ROS是什么的时候,经常使用“ROS是一个通信框架”这种描述。但是笔者认为这种描述并不是太合适。“框架”是个对初学者非常不友好的抽象词汇,用一个更抽象难懂的概念去解释一个本来就不清楚的概念,对初学者起不到任何帮助。而且笔者严重怀疑绝大多数作者能对机器人的本质或者软件框架能有什么太深的理解,他们的见解不会比你我深刻多少。
  既然提到本质,那我们就深入到最基本的问题。在接触无穷的细节之前,我们不妨先做一个哲学层面的思考。那就是,为什么ROS要解决通信问题?机器人涉及的东西千千万万,机械、电子、软件、人工智能无所不包,为什么底层的设计是一套用来通信的程序而不是别的东西。到目前为止,我还没有看到有人讨论过这个问题。这要回到机器人或者智能的本质。当我们在谈论机器人的时候,最首要的问题不是硬件设计,而是对信息的处理。一个机器人需要哪些信息,信息从何而来,如何传递,又被谁使用,这些才是最重要的问题。人类飞不鸟,游不过鱼,跑不过马,力不如牛,为什么却自称万物之灵呢。因为人有大脑,而且人类大脑处理的信息更多更复杂。抛开物质,从信息的角度看,人与动物、与机器人存在很多相似的地方。机器人由许多功能模块组成,它们之间需要协作才能形成一个有用的整体,机器人与机器人之间也需要协作才能形成一个有用的系统,要协作就离不开通信。需要什么样的信息以及信息从何而来不是ROS首先关心的,因为这取决于机器人的应用场景。因此,ROS首先要解决的是通信的问题,即如何建立通信、用什么方式通信、通信的格式是什么等等一系列具体问题。带着这些问题,我们看看ROS是如何设计的。

2.2 客户端库

  实现通信的代码在ros_comm包中,如下。其中clients文件夹一共有127个文件,看来是最大的包了。现在我们来到了ROS最核心的地带。

  客户端这个名词出现的有些突然,一个机器人操作系统里为什么需要客户端。原因是,节点与主节点master之间的关系是client/server,这时每个节点都是一个客户端(client),而master自然就是服务器端(server)。那客户端库(client libraries)是干什么的?就是为实现节点之间通信的。虽然整个文件夹中包含的文件众多,但是我们如果按照一定的脉络来分析就不会眼花缭乱。
  节点之间最主要的通信方式就是基于消息的。为了实现这个目的,需要三个步骤,如下。弄明白这三个步骤就明白ROS的工作方式了。这三个步骤看起来是比较合乎逻辑的,并不奇怪。
  ⑴ 消息的发布者和订阅者(即消息的接收方)建立连接;
  ⑵ 发布者向话题发布消息,订阅者在话题上接收消息,将消息保存在回调函数队列中;
  ⑶ 调用回调函数队列中的回调函数处理消息。

2.2.1 一个节点的诞生

  在建立连接之前,首先要有节点。节点就是一个独立的程序,它运行起来后就是一个普通的进程,与计算机中其它的进程并没有太大区别。一个问题是:ROS中为什么把一个独立的程序称为“节点”?这是因为ROS沿用了计算机网络中“节点”的概念。在一个网络中,例如互联网,每一个上网的计算机就是一个节点。前面我们看到的客户端、服务器这样的称呼,也是从计算机网络中借用的。
  下面来看一下节点是如何诞生的。我们在第一次使用ROS时,一般都会照着官方教程编写一个talker和一个listener节点,以熟悉ROS的使用方法。我们以talker为例,它的部分代码如下。

#include "ros/ros.h"
int main(int argc, char **argv)
{ 
        
  /* You must call one of the versions of ros::init() before using any other part of the ROS system. */
  ros::init(argc, argv, "talker");
  ros::NodeHandle n;

  main函数里首先调用了init()函数初始化一个节点,该函数的定义在init.cpp文件中。当我们的程序运行到init()函数时,一个节点就呱呱坠地了。而且在出生的同时我们还顺道给他起好了名字,也就是"talker"。名字是随便起的,但是起名是必须的。

  我们进入init()函数里看看它做了什么,代码如下,看上去还是挺复杂的。它初始化了一个叫g_global_queue的数据,它的类型是CallbackQueuePtr。这是个相当重要的类,叫“回调队列”,后面还会见到它。init()函数还调用了network、master、this_node、file_log、param这几个命名空间里的init初始化函数各自实现一些变量的初始化,这些变量都以g开头,例如g_hostg_uri,用来表明它们是全局变量。其中,network::init完成节点主机名、IP地址等的初始化,master::init获取master的URI、主机号和端口号。this_node::init定义节点的命名空间和节点的名字,没错,把我们给节点起的名字就存储在这里。file_log::init初始化日志文件的路径。

void init(const M_string& remappings, const std::string& name, uint32_t options)
{ 
        
  if (!g_atexit_registered) { 
        
    g_atexit_registered = true;
    atexit(atexitCallback);
  }
  if (!g_global_queue) { 
        
    g_global_queue.reset(new CallbackQueue);
  }
  if (!g_initialized) { 
        
    g_init_options = options;
    g_ok = true;
    ROSCONSOLE_AUTOINIT;
    // Disable SIGPIPE
#ifndef WIN32
    signal(SIGPIPE, SIG_IGN);
#else
    WSADATA wsaData;
    WSAStartup(MAKEWORD(2, 0), &wsaData);
#endif
    check_ipv6_environment();
    network::init(remappings);
    master::init(remappings);
    // names:: namespace is initialized by this_node
    this_node::init(name, remappings, options);
    file_log::init(remappings);
    param::init(remappings);
    g_initialized = true;
  }
}

  完成初始化以后,就进入下一步ros::NodeHandle n定义句柄。我们再进入node_handle.cpp文件,发现构造函数NodeHandle::NodeHandle调用了自己的construct函数。然后,顺藤摸瓜找到construct函数,它里面又调用了ros::start()函数。没错,我们又绕回到了init.cpp文件。ros::start()函数主要实例化了几个重要的类,如下。完成实例化后马上又调用了各自的start()函数,启动相应的动作。这些都做完了以后就可以发布或订阅消息了。一个节点的故事暂时就到这了。

TopicManager::instance()->start();
ServiceManager::instance()->start();
ConnectionManager::instance()->start();
PollManager::instance()->start();
XMLRPCManager::instance()->start();

2.2.1 XMLRPC是什么?

  关于ROS节点建立连接的技术细节,官方文档说的非常简单,在这里ROS Technical Overview。没有基础的同学看这个介绍必然还是不懂。
  在ROS中,节点与节点之间的通信依靠节点管理器(master)牵线搭桥。master像一个中介,它介绍节点们互相认识。一旦节点们认识了以后,master就完成自己的任务了,它就不再掺和了。这也是为什么你启动节点后再杀死master,节点之间的通信依然保持正常的原因。
  使用过电驴和迅雷而且研究过BitTorrent的同学对master的工作方式应该很熟悉,master就相当于Tracker服务器,它存储着其它节点的信息。我们每次下载之前都会查询Tracker服务器,找到有电影资源的节点,然后就可以与它们建立连接并开始下载电影了。
  那么master是怎么给节点牵线搭桥的呢?ROS使用了一种叫XMLRPC的方式实现这个功能。XMLRPC中的RPC的意思是远程过程调用(Remote Procedure Call)。简单来说,远程过程调用的意思就是一个计算机中的程序(在我们这就是节点啦)可以调用另一个计算机中的函数,只要这两个计算机在一个网络中。这是一种听上去很高大上的功能,它能让节点去访问网络中另一台计算机上的程序资源。XMLRPC中的XML我们在1.1节讲消息序列化时提到了,它就是一种数据表示方式而已。所以合起来,XMLRPC的意思就是把由XML表示的数据发送给其它计算机上的程序运行,运行后返回的结果仍然以XML格式返回回来,然后我们通过解析它(还原回纯粹的数据)就能干别的事了。想了解更多XMLRPC的细节可以看这个XML-RPC:概述。
  举个例子,一个XMLRPC请求是下面这个样子的。因为XMLRPC是基于HTTP协议的,所以下面的就是个标准的HTTP报文。

POST / HTTP/1.1
User-Agent: XMLRPC++ 0.7
Host: localhost:11311
Content-Type: text/xml
Content-length: 78

<?xml version="1.0"?>
<methodCall>
   <methodName>circleArea</methodName>
      <params>
         <param>
            <value><double>2.41</double></value>
         </param>
      </params>
</methodCall>

  如果你没学过HTTP协议,看上面的语句可能会感到陌生。《图解HTTP》这本小书可以让你快速入门。HTTP报文比较简单,它分两部分,前半部分是头部,后半部分是主体。头部和主体之间用空行分开,这都是HTTP协议规定的标准。上面主体部分的格式就是XML,见的多了你就熟悉了。所以,XMLRPC传递的消息其实就是主体部分是XML格式的HTTP报文而已,没什么神秘的。

  对应客户端一个XMLRPC请求,服务器端会执行它并返回一个响应,它也是一个HTTP报文,如下。它的结构和请求一样,不再解释了。所以,XMLRPC跟我们上网浏览网页的过程其实差不多。

HTTP/1.1 200 OK
Date: Sat, 06 Oct 2001 23:20:04 GMT
Server: Apache.1.3.12 (Unix)
Connection: close
Content-Type: text/xml
Content-Length: 124

<?xml version="1.0"?>
<methodResponse>
   <params>
      <param>
         <value><double>18.24668429131</double></value>
      </param>
   </params>
</methodResponse>

2.2.2 ROS中XMLRPC的实现

  上面的例子解释了XMLRPC是什么?下面我们看看ROS是如何实现XMLRPC的。ROS使用的XMLRPC介绍在这里:http://wiki.ros.org/xmlrpcpp。这次ROS的创作者没有从零开始造轮子,而是在一个已有的XMLRPC库的基础上改造的。
  XMLRPC的C++代码在下载后的ros_comm-noetic-devel\utilities\xmlrpcpp路径下。还好,整个工程不算太大。XMLRPC分成客户端和服务器端两大部分。咱们先看客户端,主要代码在XmlRpcClient.cpp文件里。
  擒贼先擒王,XmlRpcClient.cpp文件中最核心的函数就是execute,用于执行远程调用,代码如下。

// Execute the named procedure on the remote server.
// Params should be an array of the arguments for the method.
// Returns true if the request was sent and a result received (although the result might be a fault).
bool XmlRpcClient::execute(const char* method, XmlRpcValue const& params, XmlRpcValue& result)
{ 
        
  XmlRpcUtil::log(1, "XmlRpcClient::execute: met

相关文章