【从零开始的ROS四轴机械臂控制(六)】
-
- 九、逻辑控制节点
-
- 1.运动控制方法
-
- (1)逆向运动学
- (2)反馈控制
- 2.各节点之间的联系
- 3.相关程序
-
- (1)img_process节点
- (2)arm_command节点
- 4.运行程序
九、逻辑控制节点
逻辑控制节点是实现我们逻辑功能的核心,即调用图像处理,和夹爪移动服务来夹取目标放到相应的位置。其效果如下
1.运动控制方法
首先是控制机械臂移动的方法,即如何把图像信息转换成相应的joint关节角度。由于我们摄像头位置和机械手模型设计的关系,我想到了两种解决办法。一是逆向运动学,二是反馈控制。
(1)逆向运动学
逆向运动学是指,根据由夹爪坐标来解算各个关节的角度。使用的前提是,除了关节坐标我们都需要知道,包括各个link的长度,以及夹爪需要到达的位置。 让我们来看一下理想的简化机械臂的左视图,
各个link和灰色虚线之间的夹角图中所示,joint2, jonit3, joint4的角度是我们需要关注的。将其单独摘出如下图:
我们需要根据dis_y和dis_z来求解joint2、joint3、joint4的值。除此之外,还有一个条件就是link4(joint4–>camera之间为link4)需要平行于地面.假设各个link等长为可以求得以下关系:
代入joint2、joint3可得
from scipy.optimize import root, fsolve
import math
class Solve(object):
def __init__(self):
self.y = 0.0
self.z = 0.0
res = fsolve(self.f, [20, 90], xtol=0.001)
print(self.joint_sovle_range(res))
print(math.cos(res[0]) + math.sin(math.pi / 2 + res[0] - res[1]))
print(self.f(res))
def f(self, x):
a,b = x.tolist()
return[math.cos(a) + math.sin(math.pi/2 + a - b) - self.y, -math.sin(a) + math.cos(math.pi/2 + a - b) - self.z]
def joint_sovle_range(self, res):
for i in range(2):
while res[i - 1] > 3.14:
res[i - 1] = res[i - 1] - 3.14
while res[i - 1] < 0.0:
res[i - 1] = res[i - 1] + 3.14
return res
if __name__ == '__main__':
Solve()
但是这种方法有种很大的缺陷,之前说过使用的前提是,除了关节坐标我们都需要知道,包括各个link的长度,以及夹爪需要到达的位置。夹爪需要到达的位置是由摄像头提供的。从摄像头的图像坐标投射到三维坐标需要一定的转换系数,这个常数是我们无法轻易获得的。且转换常数需要一个较高准确度,否则会导致计算出的joint值出错。 种种限制条件都导致了这种方法都不一个较好的解决方法。那么问题来了,如何解决从摄像头的图像坐标投射到三维坐标所需要转换系数,或者有什么好的方法来替代它?这时候我能想到一个比较好的方法就是反馈控制了,让夹爪根据图像自主调节位置。
(2)反馈控制
反馈控制是指,从图像获得位置误差,根据误差来移动一定程度的位置。然后再利用图像获得目标位置,依次循环,直到达到可接受的误差。
假设上图是我们从摄像头获得的图像,我们从img_process获得了目标的中点(midpoint),我们需要将目标移动到视觉的中心,所以从图中的出了x和y方向上的误差。
若目标靠近,则图像范围会变大,所以图像范围就可以作为一个新的条件来控制jonit3.
和之前一样,我们需要保证link4节点平行于地面,其原因主要是为了方便于图像处理。 一个非常重要的一点是,这个时候的误差是基于像素的,无法直接作用于角度,所以就需要一个转换,从像素误差到夹爪角度。我在程序中只是简单的提取了它的符号,使每次循环的只在相应的符号上移动一个单位,这样就导致了移动比较缓慢。这里还可以采用PID控制,相信会有一个比价好的效果。 以下是相应的功能在程序中的实现
# get the error direction
def img_data_to_error(self, midpoint, image_range, z_range_parameter=340):
# get error in x,y direction
camera_midpoint = [320, 290]
delta_x = camera_midpoint[0] - midpoint.data[0]
delta_y = camera_midpoint[1] - midpoint.data[1]
print(midpoint.data)
# get error in z direction
z_x = image_range.data[2] - image_range.data[0]
z_y = image_range.data[3] - image_range.data[1]
object_range = math.sqrt(z_x * z_x + z_y * z_y)
delta_z = z_range_parameter - object_range
if not self.z_at_goal:
self.x_at_goal = self.at_goal(midpoint.data[0], camera_midpoint[0])
self.y_at_goal = self.at_goal(midpoint.data[1], camera_midpoint[1])
self.z_at_goal = self.at_goal(object_range, z_range_parameter, error_parameter=30)
if self.z_at_goal:
self.x_at_goal = self.at_goal(midpoint.data[0], camera_midpoint[0], error_parameter=50)
self.y_at_goal = self.at_goal(midpoint.data[1], camera_midpoint[1], error_parameter=50)
self.z_at_goal = self.at_goal(object_range, z_range_parameter, error_parameter=30)
print(delta_x, delta_y, delta_z)
# get the sign in all direction
x_sign = self.sign_output(self.x_at_goal, delta_x)
y_sign = self.sign_output(self.y_at_goal, delta_y)
z_sign = self.sign_output(self.z_at_goal, delta_z)
return -x_sign, y_sign, z_sign
2.各节点之间的联系
之前我们已经编写除了GoToPosition服务和image_processs节点。然后是主逻辑节点arm_command。它需要发布当前的循环信息和订阅图像信息和joint信息,根据获取到的数据来控制机械臂移动。 arm_command和img_process节点关系,以及子函数,数据之间关系大致如下图所示,蓝色的虚线为节点间的连接。
首先打开arm_command节点,初始化所有数据和机械臂动作,等待来自于img_process节点的一个目标检测信息,如果检测到目标,则计算在各个方向的误差。 如果误差不再允许范围之内,则输出相应的移动符号到arm_controller函数。arm_controller函数接收当前的joint数据,加上收到的移动符号乘作用系数(每次移动的单位距离),得出一个新的关节角度值。调用arm_pos_set函数将其作用到夹爪上。移动完成后发送给Img_process使之进行下一轮循环。 如果误差在允许范围之内,则可判定夹爪到达夹取位置,便可控制夹爪进行夹取。 如果没有检测到目标,则转动模型,即控制joint1循环转动寻找目标。
3.相关程序
为了完成检测-移动循环,img_process和arm_command都进行了修改。
(1)img_process节点
对img_process节点进行了修改和优化,因为在使用时发现,该节点检测速度较慢,延迟较高,无法及时的检测目标。
#!/usr/bin/env python
import math
import rospy
import cv2
import numpy as np
from sensor_msgs.msg import Image, JointState
from std_msgs.msg import Float64, Bool, Float64MultiArray
from cv_bridge import CvBridge, CvBridgeError
class ImgProcess(object):
def __init__(self):
self.node_name = 'image_process'
rospy.init_node(self.node_name)
self.bridge = CvBridge()
self.loop_sign = False
self.ac_run = False
self.r = rospy.Rate(5)
# Subscribers
self.sub1 = rospy.Subscriber("rgb_camera/image_raw", Image, self.img_process_callback)
# Publishers
self.pub1 = rospy.Publisher("/image_process/midpoint", Float64MultiArray, queue_size=1)
self.pub2 = rospy.Publisher("/image_process/image_range", Float64MultiArray, queue_size=1)
self.pub3 = rospy.Publisher("/image_process/point_exist", Bool, queue_size=1)
self.pub4 = rospy.Publisher('/image_process/joint1_position', Float64, queue_size=1)
rospy.loginfo("Waiting for image topics...")
def image_process(self, image):
image_hsv = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
orange_min = np.array([100, 128, 46])
orange_max = np.array([124, 255, 255])
mask = cv2.inRange(image_hsv, orange_min, orange_max)
ret, binary = cv2.threshold(mask, 1, 256, cv2.THRESH_BINARY)
non0_point = 0
non0_rows = 0
non0_cols = 0
length_max = 0
width_max = 0
length_min = 0
width_min = 0
point_exist = False
rows, cols = np.shape(binary)
# first loop check the target point, if it exist, set point_exist to True
for m in range(rows/2):
if point_exist:
break
for n in range(cols/2):
if binary[2*m, 2*n] != 0:
point_exist = True
break
# if target point exist, find its location
if point_exist:
for m in range(rows):
for n in range(cols):
if binary[m, n] != 0:
non0_rows += m
non0_cols += n
non0_point += 1
if non0_point == 1:
width_min = n
length_min = m
if n < width_min:
width_min = n
if n > width_max:
width_max = n
if m < length_min:
length_min = m
if m > length_max:
length_max = m
midpoint = [int(non0_cols / non0_point), int(non0_rows / non0_point)]
min_point = [length_min, width_min]
max_point = [length_max, width_max]
else:
midpoint = [0, 0]
min_point = [0, 0]
max_point = [0, 0]
image_range = [min_point[0], min_point[1], max_point[0], max_point[1]]
return midpoint, image_range, point_exist
def img_process_callback(self, ros_image):
try:
frame = self.bridge.imgmsg_to_cv2(ros_image, "bgr8")
except CvBridgeError:
print('CvBrigeError!')
frame = np.array(frame, dtype=np.uint8)
midpoint, image_range, point_exist = self.image_process(frame)
midpoint_meg = Float64MultiArray(data=midpoint)
image_range_meg = Float64MultiArray(data=image_range)
if point_exist:
rospy.set_param('~point_exist', True)
rospy.loginfo('midpoint: %s, image_range: %s, %s, %s, %s',
midpoint, image_range[0], image_range[1], image_range[2], image_range[3])
joint_state = rospy.wait_for_message('/arm1/joint_states', JointState)
rospy.loginfo('waiting for arm_command...loop_sign is %s', self.loop_sign)
while point_exist:
self.loop_sign = rospy.get_param("/arm_command/loop_sign")
rospy.loginfo('now loop_sign is %s', self.loop_sign)
self.pub1.publish(midpoint_meg)
self.pub2.publish(image_range_meg)
self.pub3.publish(point_exist)
self.pub4.publish(joint_state.position[0])
self.r.sleep()
if self.loop_sign:
print('loop_sign is True, begin the next loop...')
break
else:
rospy.set_param('~point_exist', False)
rospy.loginfo('None targets were detected, %s', point_exist)
self.pub3.publish(point_exist)
if __name__ == '__main__':
try:
ImgProcess()
rospy.spin()
except rospy.ROSInterruptException:
pass
相关程序注释:
# first loop check the target point, if it exist, set point_exist to True
for m in range(rows/2):
if point_exist:
break
for n in range(cols/2):
if binary[2*m, 2*n] != 0:
point_exist = True
break
定义一个新的目标检测函数,涉及运算步骤较少,能快速的检测图像是否存在目标。
while point_exist:
self.loop_sign = rospy.get_param("/arm_command/loop_sign")
rospy.loginfo('now loop_sign is %s', self.loop_sign)
self.pub1.publish(midpoint_meg)
self.pub2.publish(image_range_meg)
self.pub3.publish(point_exist)
self.pub4.publish(joint_state.position[0])
self.r.sleep()
if self.loop_sign:
print('loop_sign is True, begin the next loop...')
break
检测arm_command循环标志,如果循环没有完成,则一直发布当前图像信息,直到检测loop_sign为True则循环结束。 因为arm_command中有rospy.wait_for_message()
函数,如果没有检测到信息会一直等待。肯定会由于某些原因,例如时延等,arm_command会接受不到信息,所以需要img_process一直发布当前图像信息话题。 此处还是用了参数服务器,能很方便的在各个节点中交换信息。
(2)arm_command节点
#!/usr/bin/env python
import math
import rospy
import cv2
import numpy as np
from std_msgs.msg import Float64, Bool, Float64MultiArray
from sensor_msgs.msg import JointState
from scipy.optimize import fsolve
from arm1.srv import *
class ArmCommand(object):
def __init__(self):
node_name = 'arm_command'
rospy.init_node(node_name)
# init symbols of completion of the moving loop
self.dis_y = 0.0
self.dis_z = 0.0
self.joint1_g = 0.0
self.midpoint = 0.0
self.image_range = 0.0
self.joint1_data = 0.0
self.x_at_goal = False
self.y_at_goal = False
self.z_at_goal = False
self.move_end = False
self.loop_sign = False
# init ros data
self.r = rospy.Rate(1)
rospy.set_param('~loop_sign', False)
self.start_time = rospy.Time.now().to_sec()
self.run_sign = False
# init Service
self.arm_mover = rospy.ServiceProxy('/arm_mover/arm_mover', GoToPosition)
self.msg = GoToPositionRequest()
if not self.run_sign:
self.arm_pos_set(pos_init=True)
# init Publisher
self.pub1 = rospy.Publisher('/arm_command/loop_sign', Bool, queue_size=10)
self.pub2 = rospy.Publisher('/arm_command/program_run', Bool, queue_size=10)
# init Subscriber
rospy.Subscriber('/image_process/point_exist', Bool, self.callback)
rospy.Subscriber('/arm_mover/move_end', Bool, self.move_callback)
rospy.loginfo('program is run')
# translate joints' values between calculated value and true value
def value_translation(self, joint_value, joint_name, joint_type):
if joint_type == 'cal2gaz':
if joint_name == 'joint2':
value = joint_value - 0.35
if joint_name == 'joint3':
value = joint_value + 0.6
if joint_name == 'joint4':
value = joint_value + 0.55
if joint_type == 'gaz2cal':
if joint_name == 'joint2':
value = joint_value + 0.35
if joint_name == 'joint3':
value = joint_value - 0.6
if joint_name == 'joint4':
value = joint_value - 0.55
return value
# set all positions of joints with this function
def arm_pos_set(self, joint1=0.0, joint2=0.52,
joint3=-1.57,
joint4=0.98,
right_joint=1.0,
left_joint=1.0,
pos_init=False):
self.msg.joint1 = joint1
self.msg.joint2 = self.value_translation(joint2, 'joint2', 'cal2gaz')
self.msg.joint3 = self.value_translation(joint3, 'joint3', 'cal2gaz')
self.msg.joint4 = self.value_translation(joint4, 'joint4', 'cal2gaz')
self.msg.right_joint = right_joint
self.msg.left_joint = left_joint
response = self.arm_mover(self.msg)
if pos_init:
rospy.loginfo('init all joints position')
# The following parts are use for feedback loop
# at goal detection: return true, if it's less than an acceptable error.
def at_goal(self, pos_joint, goal_joint, error_parameter=40):
result = abs(pos_joint - goal_joint) <= abs(error_parameter)
return result
# determine the direction of the error
def sign_output(self, at_goal, delta):
if not at_goal:
if delta > 0:
sign = 1
else:
sign = -1
else:
sign = 0
return sign
# get the error direction
def img_data_to_error(self, midpoint, image_range, z_range_parameter=340):
# get error in x,y direction
camera_midpoint = [320, 290]
delta_x = camera_midpoint[0] - midpoint.data[0]
delta_y = camera_midpoint[1] - midpoint.data[1]
print(midpoint.data)
# get error in z direction
z_x = image_range.data[2] - image_range.data[0]
z_y = image_range.data[3] - image_range.data[1]
object_range = math.sqrt(z_x * z_x + z_y * z_y)
delta_z = z_range_parameter - object_range
if not self.z_at_goal:
self.x_at_goal = self.at_goal(midpoint.data[0], camera_midpoint[0])
self.y_at_goal = self.at_goal(midpoint.data[1], camera_midpoint[1])
self.z_at_goal = self.at_goal(object_range, z_range_parameter, error_parameter=30)
if self.z_at_goal:
self.x_at_goal = self.at_goal(midpoint.data[0], camera_midpoint[0], error_parameter=50)
self.y_at_goal = self.at_goal(midpoint.data[1], camera_midpoint[1], error_parameter=50)
self.z_at_goal = self.at_goal(object_range, z_range_parameter, error_parameter=30)
print(delta_x, delta_y, delta_z)
# get the sign in all direction
x_sign = self.sign_output(self.x_at_goal, delta_x)
y_sign = self.sign_output(self.y_at_goal, delta_y)
z_sign = self.sign_output(self.z_at_goal, delta_z)
return -x_sign, y_sign, z_sign
def joint_angle_limit(self, joint_value, joint_name):
joint_limit = False
if joint_name == 'joint2':
limit = [0.0, 1.57]
elif joint_name == 'joint3':
limit = [-1.57, 0]
elif joint_name == 'joint4':
limit = [0.0, 1.57]
if not limit[0] <= joint_value <= limit[1]:
joint_value = min(max(limit[0], joint_value), limit[1])
joint_limit = True
return joint_value, joint_limit
def get_joint_value(self):
joint_state = rospy.wait_for_message('/arm1/joint_states', JointState)
# get joints' real value
joint1 = joint_state.position[0]
joint2 = joint_state.position[1]
joint3 = joint_state.position[2]
joint4 = joint_state.position[3]
# translate joints' value
joint2 = self.value_translation(joint2, 'joint2', 'gaz2cal')
joint3 = self.value_translation(joint3, 'joint3', 'gaz2cal')
joint4 = self.value_translation(joint4, 'joint4', 'gaz2cal')
return joint1, joint2, joint3, joint4
def arm_controller(self, x_direction_sign, y_direction_sign, z_direction_sign, effect=0.01):
joint1, joint2, joint3, joint4 = self.get_joint_value()
joint1 = self.joint1_data.data
rospy.loginfo('get :joint1:%s, joint2:%s, joint3:%s, joint4:%s', joint1, joint2, joint3, joint4)
# calculate the next joint value
joint1_cal = joint1 + effect * x_direction_sign
joint2_cal = joint2 + effect * y_direction_sign
joint3_cal = joint3 + effect * 5 * z_direction_sign
joint4_cal = abs(joint3) - abs(joint2) - 0.15
rospy.loginfo('cal :joint1:%s, joint2:%s, joint3:%s, joint4:%s', joint1, joint2, joint3, joint4)
# limit joints' value
joint2_cal, joint2_limit = self.joint_angle_limit(joint2_cal, 'joint2')
joint3_cal, joint3_limit = self.joint_angle_limit(joint3_cal, 'joint3')
joint4_cal, joint4_limit = self.joint_angle_limit(joint4_cal, 'joint4')
# rospy.loginfo('limit:joint1:%s, joint2:%s, joint3:%s, joint4:%s', joint1, joint2, joint3, joint4)
limit = joint2_limit and joint3_limit and joint4_limit
if limit:
rospy.logwarn('target is out of the range!')
# set joints' value, at begin just set values of joint1 and joint2
if not (self.x_at_goal and self.y_at_goal):
self.arm_pos_set(joint1=joint1_cal, joint2=joint2_cal, joint3=joint3, joint4=joint4_cal)
else:
self.arm_pos_set(joint1=joint1_cal, joint2=joint2_cal, joint3=joint3_cal, joint4=joint4_cal)
while not self.move_end:
if self.move_end:
break
return True
def command_callback(self, exist):
self.run_sign = True
if exist:
try:
rospy.loginfo('point is exist!')
poex = rospy.get_param("/image_process/point_exist")
while not (self.x_at_goal and self.y_at_goal and self.z_at_goal):
# get img data
poex = rospy.get_param("/image_process/point_exist")
if not poex:
break
rospy.set_param('~loop_sign', False)
rospy.loginfo('start moving...')
self.midpoint = rospy.wait_for_message('image_process/midpoint', Float64MultiArray)
self.image_range = rospy.wait_for_message('image_process/image_range', Float64MultiArray)
self.joint1_data = rospy.wait_for_message('image_process/joint1_position', Float64)
# deal joints information
x_sign, y_sign, z_sign = self.img_data_to_error(self.midpoint, self.image_range)
print(x_sign, y_sign, z_sign)
self.loop_sign = self.arm_controller(x_sign, y_sign, z_sign, effect=0.01)
# self.pub1.publish(self.loop_sign)
rospy.set_param('~loop_sign', True)
self.r.sleep()
self.r.sleep()
if poex:
print('start to move gripper')
# gripper start run
joint1, joint2, joint3, joint4 = self.get_joint_value()
self.arm_pos_set(joint1=joint1, joint2=joint2, joint3=joint3, joint4=joint4,
left_joint=0.2, right_joint=0.2)
self.r.sleep()
print('return to initial position')
# return to initial position
self.arm_pos_set(left_joint=0.15, right_joint=0.15)
self.r.sleep()
# arm turn to predetermined position
self.arm_pos_set(joint1=1.0, left_joint=0.15, right_joint=0.15)
self.r.sleep()
self.arm_pos_set(joint1=1.0, joint2=0.2, joint3=-1.0, joint4=0.8,
left_joint=0.15, right_joint=0.15)
self.r.sleep()
# gripper opening
self.arm_pos_set(joint1=1.0, joint2=0.2, joint3=-1.0, joint4=0.8,
left_joint=0.15, right_joint=0.15)
self.r.sleep()
self.arm_pos_set(pos_init=True)
self.r.sleep()
rospy.loginfo('program has finish')
rospy.on_shutdown()
except rospy.ServiceException:
rospy.logwarn("Midpoint is exist, but service call failed")
else:
try:
rospy.loginfo('try to find target')
elapsed = rospy.Time.now().to_sec() - self.start_time
joint1 = math.sin(2 * math.pi * 0.05 * elapsed) * (math.pi / 2)
self.arm_pos_set(joint1=joint1)
# self.joint1_g = math.sin(2 * math.pi * 0.1 * elapsed) * (math.pi / 2)
except rospy.ServiceException:
rospy.logwarn("Midpoint is not exist and Service call failed")
def move_callback(self, move_end):
self.move_end = move_end.data
def callback(self, exist):
rospy.wait_for_service('arm_mover/arm_mover')
self.pub2.publish(True)
self.command_callback(exist.data)
if __name__ == '__main__':
try:
ArmCommand()
rospy.spin()
except rospy.ROSInterruptException:
pass
相关程序注释:
while not (self.x_at_goal and self.y_at_goal and self.z_at_goal):
# get img data
poex = rospy.get_param("/image_process/point_exist")
if not poex:
break
rospy.set_param('~loop_sign', False)
rospy.loginfo('start moving...')
self.midpoint = rospy.wait_for_message('image_process/midpoint', Float64MultiArray)
self.image_range = rospy.wait_for_message('image_process/image_range', Float64MultiArray)
self.joint1_data = rospy.wait_for_message('image_process/joint1_position', Float64)
# deal joints information
x_sign, y_sign, z_sign = self.img_data_to_error(self.midpoint, self.image_range)
print(x_sign, y_sign, z_sign)
self.loop_sign = self.arm_controller(x_sign, y_sign, z_sign, effect=0.01)
# self.pub1.publish(self.loop_sign)
rospy.set_param('~loop_sign', True)
self.r.sleep()
以上是回调函数的最主要的反馈循环部分,处理图像数据,移动机械臂。
# translate joints' values between calculated value and true value
def value_translation(self, joint_value, joint_name, joint_type):
if joint_type == 'cal2gaz':
if joint_name == 'joint2':
value = joint_value - 0.35
if joint_name == 'joint3':
value = joint_value + 0.6
if joint_name == 'joint4':
value = joint_value + 0.55
if joint_type == 'gaz2cal':
if joint_name == 'joint2':
value = joint_value + 0.35
if joint_name == 'joint3':
value = joint_value - 0.6
if joint_name == 'joint4':
value = joint_value - 0.55
return value
关节角度换算函数,
从urdf定义的joint初始角度换算到计算使用的初始角度。
程序已经上传github。
4.运行程序
运行程序并添加目标
$ cd ~/catkin_ws
$ source devel/setup.bash
$ roslaunch arm1 gazebo.launch
运行图像处理节点
$ cd ~/catkin_ws
$ source devel/setup.bash
$ rosrun arm1 image_process
运行逻辑控制节点
$ cd ~/catkin_ws
$ source devel/setup.bash
$ rosrun arm1 arm_command
查看摄像头
rqt_image_view /rgb_camera/image_raw
视频如下:
视频
从零开始的ROS四轴机械臂控制