Skip to main content

Writing the Controller ROS Node

ROS1

Using the QingLong robot as an example, write a node for the simulation industrial control computer.

In the io_teleop_robot_control_node/scripts path, create a controller ROS node named openloong_controller_node.py.

The industrial computer node script includes five parts: library imports, establishing the simulation environment, loading the robot parameter files, setting up ROS topics, and the corresponding execution functions.

Library Imports

When using simulation, the library code does not need to be modified; simply copy it. When using the physical robot, different libraries need to be imported based on the format of the data transmitted by the robot.

import sys
import os

sys.path.append(os.path.join(os.path.dirname(__file__), "..")) # Add the parent directory of the current file to the search path
import time
import threading
import yaml
import numpy as np
import pybullet as p
import rospy
from io_teleop_robot_utils.robot_module import AssembledRobot # This library comes from the parent directory of the current file
from std_msgs.msg import Header
from std_srvs.srv import Trigger, TriggerResponse
import argparse
# Import data transmission formats. Some data transmission formats are determined by the physical robot; additional files need to be added to the parent directory of the current file and imported
from sensor_msgs.msg import JointState # Standard data format
from geometry_msgs.msg import PoseArray # Standard data format

from udp_ros_node.msg import PuppetState # Custom data format. Corresponding additional file imported
from my_listener_package.msg import AlohaCmd, vp_control, headcontrol

Establishing the Simulation Environment

This step is used to establish the physical engine to calculate the robot kinematics simulation and set up the model visualization. The custom part is the class name, written here as SimOpenLoongController. Other parts can be copied directly.

# Establish the simulation industrial control computer node
class SimOpenLoongController(AssembledRobot): # Class name is custom, parent class name (AssembledRobot) remains unchanged
# Initialization
def __init__(self, config_path, without_gui, init_debug=False):
client_id = p.connect(p.DIRECT) if without_gui else p.connect(p.GUI) # Connect to initialize the simulation engine. without_gui determines whether to open the simulation interface, parameter is placed in the robot configuration file
p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0) # Enable graphical user interface
p.configureDebugVisualizer(p.COV_ENABLE_RGB_BUFFER_PREVIEW, 0)
p.configureDebugVisualizer(p.COV_ENABLE_DEPTH_BUFFER_PREVIEW, 0)
p.configureDebugVisualizer(p.COV_ENABLE_SEGMENTATION_MARK_PREVIEW, 0)
p.configureDebugVisualizer(p.COV_ENABLE_SHADOWS, 0)
p.resetDebugVisualizerCamera(
cameraDistance=1,
cameraPitch=-60,
cameraYaw=0,
cameraTargetPosition=[0, 0, 0.6],
) # Set camera position and orientation

# Set simulation function
def physics_sim():
# p.setRealTimeSimulation(1)
while not rospy.is_shutdown():
p.stepSimulation()
time.sleep(1.0 / 240.0)
# Create a thread to run the simulation
physics_sim_thread = threading.Thread(target=physics_sim)
physics_sim_thread.start()

After completing this step, add the execution command at the end of the script:

if __name__ == "__main__":
rospy.init_node("openloong_controller_node") # Initialize the node, node name is custom
without_gui = False # Open the simulation interface
robot = SimOpenLoongController("OpenLoong/vr_configs.yml",without_gui) # Call the class in the script, parameter is the corresponding configuration file
rospy.spin() # Execute the program until the node is closed

Open a terminal window and enter the roscore command, then execute the script in another terminal window. The simulation interface should appear.

gui_window

Loading the Robot Parameter Files

This step is used to load the robot's model and parameters into the physical engine. Copy the following code into the class created when establishing the simulation environment (here SimOpenLoongController), no need to modify the code.

        self.physics_clent_id = client_id # Confirm pybullet client id, generated when establishing the simulation environment
self.robot_description_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"../io_teleop_robot_descriptions",
) # Add the path where the configuration file is located
self.configs = yaml.safe_load(
open(f"{self.robot_description_path}/{config_path}")
) # Load the configuration file
robot_urdf_path = f"{self.robot_description_path}/{os.path.dirname(config_path)}/{self.configs['urdf_path']}" # Add the path where the urdf file is located
self.wo_controller = self.configs['wo_controller'] # Determine whether to enable the IO general controller, set in the robot configuration file
AssembledRobot.__init__(
self,
robot_urdf_path,
self.configs,
init_debug,
self.wo_controller,
self.physics_clent_id,
) # Load the robot model

Remove the without_gui variable from the execution command in the previous step and execute the modified file. If the robot configuration file is correctly written and placed in the correct path, the robot model should appear in the simulation interface, and the initial poses of the robot's arms and head should match the settings in the configuration file.

init_model

Setting Up ROS Topics

This step sets up communication between the industrial control computer ROS node and other ROS nodes. In the simulation environment, simply continue the code above and copy the following code into the initialization function of the SimOpenLoongController class. If applied on the physical robot, additional communication topics between the industrial control computer node and the robot need to be added. The names of the communication topics refer to the names in the configuration file.

        # Get joint state and then publish to controller
self.joint_state_pub = rospy.Publisher(
f"/io_teleop/joint_states", # Topic name
JointState, # Topic data type
queue_size=1 # Set the size of the queue that caches the published topic information
) # Establish a topic to publish the robot joint states
self.joint_cmd_sub = rospy.Subscriber(
f"/io_teleop/joint_cmd",
JointState,
self.joint_cmd_callback, # Corresponding callback function for the subscribed topic
) # Receive the robot target joint angle commands from the IO general controller
self.joint_cmd_from_vr_sub = rospy.Subscriber(
"/io_teleop/target_joint_from_vr",
JointState,
self.joint_cmd_from_vr_callback,
) # Receive the robot target joint angle commands from the VR device
self.gripper_status_sub = rospy.Subscriber(
"/io_teleop/target_gripper_status",
JointState,
self.gripper_status_callback,
) # Receive the target status commands for the gripper
self.joint_state_pub_rate = rospy.Rate(100) # Set the communication frequency for publishing topics
joint_state_pub_thread = threading.Thread(target=self.joint_state_pub_thread) # Create a thread to publish topics
joint_state_pub_thread.Daemon = True # Set as a daemon thread
joint_state_pub_thread.start() # Start the thread to publish topics
self.reset_service_server = rospy.Service(
"io_teleop_reset_robot", Trigger, self.reset_service_callback
) # Start the service node to reset to the initial position

Execution Functions Corresponding to ROS Topics

This step writes the functions corresponding to the ROS topics, including setting the functions executed by the topic publishing thread and the callback functions for each subscribed topic. The main functions include data type conversion, data variable assignment, and moving the corresponding joints of the robot. For the simulation model, simply copy the following code into the SimOpenLoongController class, paying attention to indentation and not copying it under the initialization function.

    def joint_state_pub_thread(self): # Define the function executed by the topic publishing thread
joint_state = JointState() # Create a variable of type JointState
while not rospy.is_shutdown():
joint_state.header.stamp = rospy.Time.now() # Assign value to the variable header. If the header is not assigned, communication may fail
joint_state.name = []
joint_state.position = []
for i in range(p.getNumJoints(self.robot_id)): # Traverse the robot joints
joint_info = p.getJointInfo(self.robot_id, i) # Get information of each joint
if joint_info[2] == p.JOINT_FIXED: # Filter out fixed joints
continue
joint_state.name.append(joint_info[1].decode("utf-8")) # Add joint name
joint_state.position.append(p.getJointState(self.robot_id, i)[0]) # Add joint position
self.joint_state_pub.publish(joint_state) # Publish joint states
self.joint_state_pub_rate.sleep()

def joint_cmd_callback(self, msg): # Define the callback function after receiving the target joint angle command from the IO general controller
joint_pos = msg.position # Get the received data joint states
joint_names = msg.name # Get the received data joint names
for pos, name in zip(joint_pos, joint_names): # Traverse the received data
joint_id = self.joint_name2id_dict[name] # Convert joint name to joint id
self.move_j([joint_id], [pos]) # Move the corresponding joint to the target value

def joint_cmd_from_vr_callback(self, msg): # Define the callback function after receiving the target joint angle command from the VR device
joint_names = msg.name
joint_positions = msg.position
for pos, name in zip(joint_positions, joint_names):
joint_id = self.joint_name2id_dict[name]
self.move_j([joint_id], [pos])

def gripper_status_callback(self, msg): # Define the callback function after receiving the target command for the gripper
gripper_status = msg.position # Get the target gripper data
for i, gripper in enumerate(self.grippers): # Traverse the grippers
gripper.move(gripper_status[i]) # Move the gripper to the target state


def reset_home(self): # Define the function to reset the robot to the initial state
for i, arm in enumerate(self.arms): # Traverse the arms
self.move_j(arm.arm_joint_index, arm.home_j_pos) # Move the arm to the initial state
waist_config = self.configs["waist"]
self.move_j(waist_config["joint_index"], waist_config["rest_j_pos"]) # Move the waist to the initial state

if hasattr(self, "grippers"): # Check if there are grippers
for i, gripper in enumerate(self.grippers): # Traverse the grippers
gripper.reset(0) # Reset the gripper to the initial position

def reset_service_callback(self, req): # Execution function for the service node to reset to the initial position
print(
"===========================Reset robot to home request received==========================="
)
self.reset_home() # Execute the function to reset to the initial state
time.sleep(1)
response = TriggerResponse() # Set the return parameters of the service node
response.success = True # Successfully executed the reset command, return true
response.message = "Service successfully triggered!" # Inform the client that the reset command has been successfully triggered
return response

Execute the code and open a new terminal, enter rostopic list, and the following should be displayed in the terminal:

/io_teleop/joint_cmd
/io_teleop/joint_states
/io_teleop/target_gripper_status
/io_teleop/target_joint_from_vr

This proves that all topics of the industrial control computer ROS node have been started.

ROS2

To be updated