• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

ROS2探索总结(十二)—— ROS2 Python编程基础

人工智能 古月 3050次浏览 0个评论

~欢迎关注~
微信公众号:古月居(guyue_home)
新浪微博:古月春旭(
https://weibo.com/hcx196
知乎专栏:古月居(
https://zhuanlan.zhihu.com/guyuehome
原文链接:
ROS2 Python编程基础

 

ROS最重要的特性之一就是多语言的支持,可以使用C++、Python等语言进行程序的开发,ROS2会继续强化这个特性,对更多语言提供丰富的支持。

这里我们尝试ROS2当中的Python编程,实现话题和服务通信。

代码链接:
https://github.com/ros2/demos
官方教程:
https://index.ros.org/doc/ros2/Tutorials/Python-Programming

在进行操作前,请先参考以下链接,完成ROS 2的安装和例程编译:

ROS 2发布第三个正式版——Crystal Clemmys

一、话题通信

代码编译后,在终端设置环境变量,并使用如下命令运行listener和talker:

$ ros2 run demo_nodes_py listener
$ ros2 run demo_nodes_py talker

运行成功后,可以看到如下效果:

Screenshot from 2019-01-13 23-26-40

以上talker和listener的具体实现,我们可以来分析下代码,分析内容请见代码中的注释:

  • talker.py
    # 引用python接口
    import rclpy
    from rclpy.node import Node
    from std_msgs.msg import String
    
    class Talker(Node):
    
       def __init__(self):
           super().__init__('talker')
           self.i = 0
       
           # 创建一个Publisher,发布名为chatter的topic,消息类型为String
           self.pub = self.create_publisher(String, 'chatter')
       
           # 设置定时器,周期调用定时器回调函数timer_callback
           timer_period = 1.0
           self.tmr = self.create_timer(timer_period, self.timer_callback)
    
       def timer_callback(self):
           # 创建String类型的消息
           msg = String()
           msg.data = 'Hello World: {0}'.format(self.i)
           self.i += 1
           self.get_logger().info('Publishing: "{0}"'.format(msg.data))
       
           # 发布消息
           self.pub.publish(msg)
    
    
    def main(args=None):
       # ROS节点初始化
       rclpy.init(args=args)
    
       # 创建节点及发布器
       node = Talker()
    
       # 循环
       rclpy.spin(node)
    
       # 销毁节点,退出程序
       node.destroy_node()
       rclpy.shutdown()
    
    
    if __name__ == '__main__':
       main()

 

  • listener.py
# 引用python接口
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class Listener(Node):

   def __init__(self):
       super().__init__('listener')
   
       # 创建一个subscriber,订阅名为chatter的topic
       # 消息类型是String,回调函数为chatter_callback
       self.sub = self.create_subscription(String, 'chatter', self.chatter_callback)

   def chatter_callback(self, msg):
       # 将收到的消息数据打印出来
       self.get_logger().info('I heard: [%s]' % msg.data)

def main(args=None):
   # ROS节点初始化
   rclpy.init(args=args)

   # 创建节点及订阅器
   node = Listener()
 
   # 循环查询消息队列,收到消息后进入回调函数
   rclpy.spin(node)

   # 销毁节点,退出程序
   node.destroy_node()
   rclpy.shutdown()

if __name__ == '__main__':
   main()

二、服务通信

接下来我们看下ROS另外一种核心通信机制——服务的实现,运行以下命令实现ROS 1教程当中经典的加法求和服务例程:

$ ros2 run demo_nodes_py add_two_ints_server
$ ros2 run demo_nodes_py add_two_ints_client

可以看到如下效果:

Screenshot from 2019-01-13 23-27-57

add_two_ints_server和add_two_ints_client的具体实现,继续来分析代码:

  • add_two_ints_server.py
# 引用python接口
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node

class AddTwoIntsServer(Node):

   def __init__(self):
       super().__init__('add_two_ints_server')
   
       # 创建一个service服务端,提供名为AddTwoInts的service,
       # 消息类型为AddTwoInts,回调函数为add_two_ints_callback
       self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)

   def add_two_ints_callback(self, request, response):
       # 处理服务功能,将求和结果放入应答数据中
       response.sum = request.a + request.b
       self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))

       return response

def main(args=None):
   # ROS节点初始化
   rclpy.init(args=args)

   # 创建服务端
   node = AddTwoIntsServer()

   # 循环
   rclpy.spin(node)

   # 销毁节点,退出程序
   node.destroy_node()
   rclpy.shutdown()

if __name__ == '__main__':
   main()
  • add_two_ints_client.py
# 引用python接口
from example_interfaces.srv import AddTwoInts
import rclpy

def main(args=None):
   # ROS节点初始化
   rclpy.init(args=args)

   # 创建节点
   node = rclpy.create_node('add_two_ints_client')

   # 创建客户端
   cli = node.create_client(AddTwoInts, 'add_two_ints')
 
   # 等待服务端
   while not cli.wait_for_service(timeout_sec=1.0):
       print('service not available, waiting again...')
   
   # 创建请求数据
   req = AddTwoInts.Request()
   req.a = 2
   req.b = 3
 
   # 异步请求(非阻塞)
   future = cli.call_async(req)
 
   # 等待服务端应答
   rclpy.spin_until_future_complete(node, future)
   if future.result() is not None:
       node.get_logger().info('Result of add_two_ints: %d' % future.result().sum)
   else:
       node.get_logger().error('Exception while calling service: %r' % future.exception())

   # 销毁节点,退出程序
   node.destroy_node()
   rclpy.shutdown()

if __name__ == '__main__':
   main()

三、QoS配置

ROS 2中的通信中间件改用DDS,而QoS是DDS中非常重要的一环,控制了各方面与底层的通讯机制,主要从时间限制、可靠性、持续性、历史记录几个方面,满足用户针对不同场景的数据应用需求。

以话题通信为例,我们看下Python程序中该如何配置QoS:

  • talker_qos.py
import argparse
import sys

# 引用python接口
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_default, qos_profile_sensor_data
from rclpy.qos import QoSReliabilityPolicy

from std_msgs.msg import String

class TalkerQos(Node):

   def __init__(self, qos_profile):
       super().__init__('talker_qos')
       self.i = 0
   
       # 传输模式:Reliable、Best effort
       if qos_profile.reliability is QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE:
           self.get_logger().info('Reliable talker')
       else:
           self.get_logger().info('Best effort talker')
     
       # 创建一个Publisher,发布名为chatter的topic,消息类型为String,设置qos参数
       self.pub = self.create_publisher(String, 'chatter', qos_profile=qos_profile)

       # 设置定时器,周期调用定时器回调函数timer_callback
       timer_period = 1.0
       self.tmr = self.create_timer(timer_period, self.timer_callback)

   def timer_callback(self):
       # 创建String类型的消息
       msg = String()
       msg.data = 'Hello World: {0}'.format(self.i)
       self.i += 1
       self.get_logger().info('Publishing: "{0}"'.format(msg.data))
   
       # 发布消息
       self.pub.publish(msg)

def main(argv=sys.argv[1:]):
   # 解析输入参数
   parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 
   # 是否使用relibale可靠传输模式
   parser.add_argument(
       '--reliable', dest='reliable', action='store_true',
       help='set qos profile to reliable')
   parser.set_defaults(reliable=False)
 
   # 循环发布次数
   parser.add_argument(
       '-n', '--number_of_cycles', type=int, default=20,
       help='number of sending attempts')

   # 其他输入参数  
   parser.add_argument(
       'argv', nargs=argparse.REMAINDER,
       help='Pass arbitrary arguments to the executable')
   args = parser.parse_args(argv)
 
   # ROS节点初始化
   rclpy.init(args=args.argv)

   # 设置qos参数
   if args.reliable:
       custom_qos_profile = qos_profile_default
   else:
       custom_qos_profile = qos_profile_sensor_data

   # 创建节点及发布器
   node = TalkerQos(custom_qos_profile)

   # 循环
   cycle_count = 0
   while rclpy.ok() and cycle_count < args.number_of_cycles:
       rclpy.spin_once(node)
       cycle_count += 1

   # 销毁节点并退出
   node.destroy_node()
   rclpy.shutdown()

if __name__ == '__main__':
   main()
  • listener_qos.py
import argparse
import sys

# 引用python接口
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_default, qos_profile_sensor_data
from rclpy.qos import QoSReliabilityPolicy

from std_msgs.msg import String

class ListenerQos(Node):

   def __init__(self, qos_profile):
       super().__init__('listener_qos')
   
       # 传输模式:Reliable、Best effort
       if qos_profile.reliability is QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE:
           self.get_logger().info('Reliable listener')
       else:
           self.get_logger().info('Best effort listener')
     
       # 创建一个subscriber,订阅名为chatter的topic
       # 消息类型是String,回调函数为chatter_callback,设置qos参数
       self.sub = self.create_subscription(
           String, 'chatter', self.chatter_callback, qos_profile=qos_profile)

   def chatter_callback(self, msg):
       # 将收到的消息数据打印出来
       self.get_logger().info('I heard: [%s]' % msg.data)


def main(argv=sys.argv[1:]):
   # 解析输入参数
   parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 
   # 是否使用relibale可靠传输模式
   parser.add_argument(
       '--reliable', dest='reliable', action='store_true',
       help='set qos profile to reliable')
   parser.set_defaults(reliable=False)
 
   # 循环订阅次数
   parser.add_argument(
       '-n', '--number_of_cycles', type=int, default=20,
       help='max number of spin_once iterations')
   
 # 其他输入参数
   parser.add_argument(
       'argv', nargs=argparse.REMAINDER,
       help='Pass arbitrary arguments to the executable')
   args = parser.parse_args(argv)
 
   # ROS节点初始化
   rclpy.init(args=args.argv)

   # 设置qos参数
   if args.reliable:
       custom_qos_profile = qos_profile_default
   else:
       custom_qos_profile = qos_profile_sensor_data

   # 创建节点及订阅器
   node = ListenerQos(custom_qos_profile)

   # 循环
   cycle_count = 0
   while rclpy.ok() and cycle_count < args.number_of_cycles:
       rclpy.spin_once(node)
       cycle_count += 1

   # 销毁节点并退出
   node.destroy_node()
   rclpy.shutdown()

if __name__ == '__main__':
   main()

ROS 2针对四种类型的通信,提供了QoS的默认配置:

  • Default QoS settings for publishers and subscriptions
    In order to make the transition from ROS1 to ROS2, exercising a similar network behavior is desirable. By default, publishers and subscriptions are reliable in ROS2, have volatile durability, and “keep last” history.
  • Services
    In the same vein as publishers and subscriptions, services are reliable. It is especially important for services to use volatile durability, as otherwise service servers that re-start may receive outdated requests.
  • Sensor data
    For sensor data, in most cases it’s more important to receive readings in a timely fashion, rather than ensuring that all of them arrive. That is, developers want the latest samples as soon as they are captured, at the expense of maybe losing some. For that reason the sensor data profile uses best effort reliability and a smaller queue depth.
  • Parameters
    Parameters are based on services, and as such have a similar profile. The difference is that parameters use a much larger queue depth so that requests do not get lost when, for example, the parameter client is unable to reach the parameter service server.

参考:http://design.ros2.org/articles/qos.html

以上例程运行时,默认使用的是Best effort模式:

Screenshot from 2019-01-14 15-52-41

可以通过命令输入切换成Reliable模式:

Screenshot from 2019-01-14 15-54-46

还可以设置发布或订阅的次数:

Screenshot from 2019-01-14 15-57-14

 

更多内容欢迎关注:

微信公众号:古月居 (guyue_home)

新浪微博:古月春旭 (https://weibo.com/hcx196)

知乎专栏:古月居 (https://zhuanlan.zhihu.com/guyuehome)


开心洋葱 , 版权所有丨如未注明 , 均为原创丨未经授权请勿修改 , 转载请注明ROS2探索总结(十二)—— ROS2 Python编程基础
喜欢 (0)

您必须 登录 才能发表评论!

加载中……