Mastering ROS 2 for Robotic Control
What is ROS 2 and Why is it Used?
ROS 2 (Robot Operating System 2) is a flexible framework for writing robot software. Unlike traditional operating systems, ROS 2 is a collection of tools, libraries, and conventions that aim to simplify the task of creating complex and robust robot behavior across a wide variety of robot platforms.
ROS 2 is preferred over its predecessor due to:
- Improved security features for multi-robot systems
- Better real-time support
- Enhanced cross-platform compatibility
- Modern build system using CMake
- Better support for commercial applications
Core Communication Patterns
Nodes
Nodes are the fundamental building blocks of ROS 2 applications. Each node performs a specific task and communicates with other nodes through topics, services, or actions. A typical robot might have nodes for sensor processing, motion planning, and control.
To create a node in Python:
import rclpy
from rclpy.node import Node
class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')
# Node initialization code here
Topics
Topics enable asynchronous communication between nodes using a publish-subscribe model. Publishers send messages to a topic, and subscribers receive those messages. This decouples nodes in time and space.
Example of publishing to a topic:
from std_msgs.msg import String
def create_publisher(self):
self.publisher = self.create_publisher(String, 'robot_commands', 10)
Example of subscribing to a topic:
def create_subscriber(self):
self.subscription = self.create_subscription(
String,
'sensor_data',
self.sensor_callback,
10)
def sensor_callback(self, msg):
# Process sensor data
self.get_logger().info(f'Received: {msg.data}')
Services
Services provide synchronous request-response communication. A client sends a request to a service, and the service processes the request and returns a response. Services are ideal for operations that require immediate results.
To create a service server:
from example_interfaces.srv import AddTwoInts
def create_service(self):
self.service = self.create_service(
AddTwoInts,
'add_two_ints',
self.add_callback)
def add_callback(self, request, response):
response.sum = request.a + request.b
return response
To call a service from a client:
def call_service(self):
client = self.create_client(AddTwoInts, 'add_two_ints')
request = AddTwoInts.Request()
request.a = 1
request.b = 2
future = client.call_async(request)
Actions
Actions are used for long-running tasks that provide feedback during execution. They combine the benefits of topics and services, allowing for goal setting, feedback, and result retrieval.
An action has three components:
- Goal: The task to be performed
- Feedback: Progress updates during execution
- Result: The final outcome of the action
Real-time Communication and DDS
ROS 2 uses DDS (Data Distribution Service) as its underlying communication middleware. DDS provides real-time, scalable, and reliable communication between nodes.
Key DDS features:
- Quality of Service (QoS) policies allow fine-tuning of communication behavior
- Built-in discovery mechanism for automatic node connection
- Support for multiple DDS implementations (Fast DDS, Cyclone DDS, RTI Connext)
QoS policies control how messages are delivered:
- Reliability: Best effort or reliable delivery
- Durability: Whether late-joining subscribers receive old messages
- History: How many messages to store
- Deadline: Maximum time between samples
Using rclpy for Python-based Robot Control
rclpy is the Python client library for ROS 2. It provides the Python API to interact with ROS 2 concepts like nodes, topics, services, and actions.
Setting up a Basic Robot Controller
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
class BasicRobotController(Node):
def __init__(self):
super().__init__('basic_robot_controller')
# Create publisher for robot movement commands
self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
# Create subscriber for laser scan data
self.scan_subscription = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10)
# Timer for control loop
self.timer = self.create_timer(0.1, self.control_loop)
self.obstacle_detected = False
def scan_callback(self, msg):
# Check for obstacles in front of the robot
min_distance = min(msg.ranges)
self.obstacle_detected = min_distance < 1.0 # 1 meter threshold
def control_loop(self):
msg = Twist()
if self.obstacle_detected:
# Stop the robot if obstacle detected
msg.linear.x = 0.0
msg.angular.z = 0.5 # Turn to avoid obstacle
else:
# Move forward
msg.linear.x = 0.5
msg.angular.z = 0.0
self.cmd_vel_publisher.publish(msg)
Launching the Node
To run the node, create a main function:
def main(args=None):
rclpy.init(args=args)
robot_controller = BasicRobotController()
rclpy.spin(robot_controller)
robot_controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Basic Robot Control Workflow
1. Environment Setup
Before implementing robot control, ensure your ROS 2 environment is properly configured:
- Source your ROS 2 installation:
source /opt/ros/humble/setup.bash - Create a workspace:
mkdir -p ~/ros2_ws/src - Build your packages:
colcon build --packages-select your_package_name
2. Node Design
Design your control system with clear separation of concerns:
- Sensor processing nodes
- Decision-making nodes
- Actuator command nodes
- State monitoring nodes
3. Communication Architecture
Plan your topics, services, and actions carefully:
- Use appropriate message types for your robot's sensors and actuators
- Design custom message types when needed
- Consider QoS settings for real-time requirements
4. Control Loop Implementation
Implement a robust control loop that:
- Processes sensor data at appropriate frequencies
- Makes decisions based on current state
- Sends commands to actuators
- Handles error conditions gracefully
5. Testing and Validation
Test your robot control system:
- Use simulation environments first
- Validate individual components before integration
- Implement safety mechanisms to prevent hardware damage
Practical Example: Simple Navigation Node
Here's a complete example of a simple navigation node that moves a robot toward a goal while avoiding obstacles:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, Point
from sensor_msgs.msg import LaserScan
import math
class SimpleNavigator(Node):
def __init__(self):
super().__init__('simple_navigator')
self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
self.scan_subscription = self.create_subscription(
LaserScan, '/scan', self.scan_callback, 10)
self.goal_x = 5.0 # Goal position in x
self.goal_y = 5.0 # Goal position in y
self.current_x = 0.0
self.current_y = 0.0
self.timer = self.create_timer(0.1, self.navigation_loop)
def scan_callback(self, msg):
# Process laser scan to detect obstacles
pass # Implementation depends on specific sensor layout
def navigation_loop(self):
msg = Twist()
# Calculate distance to goal
dist_to_goal = math.sqrt(
(self.goal_x - self.current_x)**2 +
(self.goal_y - self.current_y)**2)
if dist_to_goal > 0.5: # If not at goal
# Move toward goal
msg.linear.x = min(0.5, dist_to_goal * 0.5)
msg.angular.z = 0.0 # Simplified navigation
else:
# At goal, stop
msg.linear.x = 0.0
msg.angular.z = 0.0
self.cmd_vel_publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
navigator = SimpleNavigator()
rclpy.spin(navigator)
navigator.destroy_node()
rclpy.shutdown()
This example demonstrates the fundamental concepts of ROS 2 robot control, providing a foundation for more complex robotic applications.