跳到主要内容

撰写控制器ROS节点

ROS1

以青龙机器人为例,撰写一个仿真中的工控机节点。

io_teleop_robot_control_node/scripts路径下建立控制器ros节点openloong_controller_node.py

工控机节点脚本包含五部分,引用库,建立仿真环境,引用机器人参数文件,ros话题建立及对应执行函数。

引用库

在使用仿真时,库代码无需修改,直接复制即可。在使用实机时,需要根据机器人传输数据的格式引入不同的库。

import sys
import os

sys.path.append(os.path.join(os.path.dirname(__file__), "..")) # 将当前文件的上一级目录添加到搜索路径当中
import time
import threading
import yaml
import numpy as np
import pybullet as p
import rospy
from io_teleop_robot_utils.robot_module import AssembledRobot # 该库来自当前文件的上一级目录
from std_msgs.msg import Header
from std_srvs.srv import Trigger, TriggerResponse
import argparse
# 引入数据传输格式,部分数据传输格式由机器人实机决定,需要在当前文件上一级目录里添加额外的文件并引入
from sensor_msgs.msg import JointState # 标准数据格式
from geometry_msgs.msg import PoseArray # 标准数据格式

from udp_ros_node.msg import PuppetState # 自定义数据格式,对应引入的额外文件
from my_listener_package.msg import AlohaCmd, vp_control, headcontrol

建立仿真环境

此步骤用于建立物理引擎计算机器人运动学仿真,以及模型可视化设置。自定义部分为类名,此处写为SimOpenLoongController,其他部分直接复制即可。

# 建立仿真工控机节点
class SimOpenLoongController(AssembledRobot): #类名称自定义,父类名称(AssembledRobot)固定不变
# 初始化
def __init__(self, config_path, without_gui, init_debug=False):
client_id = p.connect(p.DIRECT) if without_gui else p.connect(p.GUI) # 连接初始化仿真引擎,without_gui决定是否开启仿真界面,参数置于机器人配置文件中
p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0) # 启用图形用户界面
p.configureDebugVisualizer(p.COV_ENABLE_RGB_BUFFER_PREVIEW, 0)
p.configureDebugVisualizer(p.COV_ENABLE_DEPTH_BUFFER_PREVIEW, 0)
p.configureDebugVisualizer(p.COV_ENABLE_SEGMENTATION_MARK_PREVIEW, 0)
p.configureDebugVisualizer(p.COV_ENABLE_SHADOWS, 0)
p.resetDebugVisualizerCamera(
cameraDistance=1,
cameraPitch=-60,
cameraYaw=0,
cameraTargetPosition=[0, 0, 0.6],
) # 设置相机位置和朝向

# 设置仿真函数
def physics_sim():
# p.setRealTimeSimulation(1)
while not rospy.is_shutdown():
p.stepSimulation()
time.sleep(1.0 / 240.0)
# 建立线程跑仿真
physics_sim_thread = threading.Thread(target=physics_sim)
physics_sim_thread.start()

该步骤完成后,在脚本末尾添加执行命令:

if __name__ == "__main__":
rospy.init_node("openloong_controller_node") # 初始化节点,节点名称自定义
without_gui = False # 开启仿真界面
robot = SimOpenLoongController("OpenLoong/vr_configs.yml",without_gui) # 调用脚本中的类,参数为对应的配置文件
rospy.spin() # 执行程序直到节点关闭

开启一个终端窗口输入roscore命令,并在另一个终端窗口执行脚本,此时应出现仿真界面

gui_window

引用机器人参数文件

此步骤用于将机器人的模型及参数引入到物理引擎中。将下述代码复制到建立仿真环境时建立的类中(此处为SimOpenLoongController),无需修改代码。

        self.physics_clent_id = client_id # 确定pybullet客户id,建立仿真环境时生成
self.robot_description_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"../io_teleop_robot_descriptions",
) # 添加配置文件所在路径
self.configs = yaml.safe_load(
open(f"{self.robot_description_path}/{config_path}")
) # 加载配置文件
robot_urdf_path = f"{self.robot_description_path}/{os.path.dirname(config_path)}/{self.configs['urdf_path']}" # 添加urdf文件所在路径
self.wo_controller = self.configs['wo_controller'] # 确定是否开启IO通用控制器,在机器人配置文件中设置
AssembledRobot.__init__(
self,
robot_urdf_path,
self.configs,
init_debug,
self.wo_controller,
self.physics_clent_id,
) # 加载机器人模型

将上一步执行命令中的without_gui变量删除,并执行更改后的文件。如果编写的机器人配置文件正确,且放置在正确的路径中,应在仿真界面中出现机器人的模型,且机器人的手臂和头的初始位姿应符合配置文件中的设置。

init_model

ros话题建立

此步骤建立工控机ros节点与其他ros节点间的通信。仿真环境下只需要延续上面的代码,将下述代码复制到SimOpenLoongController类的初始化函数中,如果在实机上应用,需要增加工控机节点与机器人之间的通信话题,通信话题名称参考配置文件中的命名。

        # get joint state and then pub to controller
self.joint_state_pub = rospy.Publisher(
f"/io_teleop/joint_states", # 话题名称
JointState, # 话题数据类型
queue_size=1 # 设置缓存发布话题信息的队列的大小
) # 建立发布机器人关节状态量的话题
self.joint_cmd_sub = rospy.Subscriber(
f"/io_teleop/joint_cmd",
JointState,
self.joint_cmd_callback, # 对应关注话题的回执函数
) # 从IO通用控制器中接收机器人目标关节角命令
self.joint_cmd_from_vr_sub = rospy.Subscriber(
"/io_teleop/target_joint_from_vr",
JointState,
self.joint_cmd_from_vr_callback,
) # 从vr设备中接收机器人目标关节角命令
self.gripper_status_sub = rospy.Subscriber(
"/io_teleop/target_gripper_status",
JointState,
self.gripper_status_callback,
) # 接收夹爪目标状态命令
self.joint_state_pub_rate = rospy.Rate(100) # 设置发布话题通信频率
joint_state_pub_thread = threading.Thread(target=self.joint_state_pub_thread) # 建立发布话题线程
joint_state_pub_thread.Daemon = True # 设置守护线程
joint_state_pub_thread.start() # 开启发布话题线程
self.reset_service_server = rospy.Service(
"io_teleop_reset_robot", Trigger, self.reset_service_callback
) # 开启回初始位置服务节点

ros话题对应的执行函数

此步骤编写ros话题对应的函数,包括设置发布话题线程执行的函数,以及各个关注话题对应的回执函数。函数功能主要包括数据类型的转换,数据变量的赋值以及移动机器人对应关节。仿真模型只需将下述代码复制到SimOpenLoongController类中,注意缩进,不要复制到初始化函数下。

    def joint_state_pub_thread(self): # 定义发布话题线程执行函数
joint_state = JointState() # 建立数据类型为JointState的变量
while not rospy.is_shutdown():
joint_state.header.stamp = rospy.Time.now() # 变量header赋值,如果没有给header赋值可能导致无法通信
joint_state.name = []
joint_state.position = []
for i in range(p.getNumJoints(self.robot_id)): # 遍历机器人关节
joint_info = p.getJointInfo(self.robot_id, i) # 获取各关节信息
if joint_info[2] == p.JOINT_FIXED: # 滤除固定关节
continue
joint_state.name.append(joint_info[1].decode("utf-8")) # 添加关节名称
joint_state.position.append(p.getJointState(self.robot_id, i)[0]) # 添加关节位置
self.joint_state_pub.publish(joint_state) # 发布关节状态
self.joint_state_pub_rate.sleep()

def joint_cmd_callback(self, msg): # 定义接收IO通用控制器发出的目标关节角指令后的回执函数
joint_pos = msg.position # 获取接收数据关节状态
joint_names = msg.name # 获取接收数据关节名称
for pos, name in zip(joint_pos, joint_names): # 遍历接收到的数据
joint_id = self.joint_name2id_dict[name] # 将关节名称转关节id
self.move_j([joint_id], [pos]) # 移动对应关节到目标值

def joint_cmd_from_vr_callback(self, msg): # 定义接收vr发出的目标关节角指令后的回执函数
joint_names = msg.name
joint_positions = msg.position
for pos, name in zip(joint_positions, joint_names):
joint_id = self.joint_name2id_dict[name]
self.move_j([joint_id], [pos])

def gripper_status_callback(self, msg): # 定义接收夹爪目标指令后的回执函数
gripper_status = msg.position # 获取目标夹爪数据
for i, gripper in enumerate(self.grippers): # 遍历夹爪
gripper.move(gripper_status[i]) # 移动夹爪到目标状态


def reset_home(self): # 定义机器人回初始状态的函数
for i, arm in enumerate(self.arms): # 遍历双臂
self.move_j(arm.arm_joint_index, arm.home_j_pos) # 手臂移动到初始状态
waist_config = self.configs["waist"]
self.move_j(waist_config["joint_index"], waist_config["rest_j_pos"]) # 腰部回到初始状态

if hasattr(self, "grippers"): # 判断是否存在夹爪
for i, gripper in enumerate(self.grippers): #遍历夹爪
gripper.reset(0) # 夹爪回到初始位置

def reset_service_callback(self, req): # 回初始位置服务节点执行函数
print(
"===========================Reset robot to home request received==========================="
)
self.reset_home() # 执行回初始状态的函数
time.sleep(1)
response = TriggerResponse() # 设置服务节点返回参数
response.success = True # 回初始位置命令成功执行,返回ture
response.message = "Service successfully triggered!" # 告诉客户端已成功触发回初始位置的命令
return response

执行代码,并重新打开一个终端,输入rostopic list,应在终端内看到如下显示:

/io_teleop/joint_cmd
/io_teleop/joint_states
/io_teleop/target_gripper_status
/io_teleop/target_joint_from_vr

证明工控机ros节点的全部话题已经启动。

ROS2

待更新