Skip to main content

Mastering ROS 2 for Robotic Control

What is ROS 2 and Why is it Used?

ROS 2 (Robot Operating System 2) is a flexible framework for writing robot software. Unlike traditional operating systems, ROS 2 is a collection of tools, libraries, and conventions that aim to simplify the task of creating complex and robust robot behavior across a wide variety of robot platforms.

ROS 2 is preferred over its predecessor due to:

  • Improved security features for multi-robot systems
  • Better real-time support
  • Enhanced cross-platform compatibility
  • Modern build system using CMake
  • Better support for commercial applications

Core Communication Patterns

Nodes

Nodes are the fundamental building blocks of ROS 2 applications. Each node performs a specific task and communicates with other nodes through topics, services, or actions. A typical robot might have nodes for sensor processing, motion planning, and control.

To create a node in Python:

import rclpy
from rclpy.node import Node

class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')
# Node initialization code here

Topics

Topics enable asynchronous communication between nodes using a publish-subscribe model. Publishers send messages to a topic, and subscribers receive those messages. This decouples nodes in time and space.

Example of publishing to a topic:

from std_msgs.msg import String

def create_publisher(self):
self.publisher = self.create_publisher(String, 'robot_commands', 10)

Example of subscribing to a topic:

def create_subscriber(self):
self.subscription = self.create_subscription(
String,
'sensor_data',
self.sensor_callback,
10)

def sensor_callback(self, msg):
# Process sensor data
self.get_logger().info(f'Received: {msg.data}')

Services

Services provide synchronous request-response communication. A client sends a request to a service, and the service processes the request and returns a response. Services are ideal for operations that require immediate results.

To create a service server:

from example_interfaces.srv import AddTwoInts

def create_service(self):
self.service = self.create_service(
AddTwoInts,
'add_two_ints',
self.add_callback)

def add_callback(self, request, response):
response.sum = request.a + request.b
return response

To call a service from a client:

def call_service(self):
client = self.create_client(AddTwoInts, 'add_two_ints')
request = AddTwoInts.Request()
request.a = 1
request.b = 2
future = client.call_async(request)

Actions

Actions are used for long-running tasks that provide feedback during execution. They combine the benefits of topics and services, allowing for goal setting, feedback, and result retrieval.

An action has three components:

  • Goal: The task to be performed
  • Feedback: Progress updates during execution
  • Result: The final outcome of the action

Real-time Communication and DDS

ROS 2 uses DDS (Data Distribution Service) as its underlying communication middleware. DDS provides real-time, scalable, and reliable communication between nodes.

Key DDS features:

  • Quality of Service (QoS) policies allow fine-tuning of communication behavior
  • Built-in discovery mechanism for automatic node connection
  • Support for multiple DDS implementations (Fast DDS, Cyclone DDS, RTI Connext)

QoS policies control how messages are delivered:

  • Reliability: Best effort or reliable delivery
  • Durability: Whether late-joining subscribers receive old messages
  • History: How many messages to store
  • Deadline: Maximum time between samples

Using rclpy for Python-based Robot Control

rclpy is the Python client library for ROS 2. It provides the Python API to interact with ROS 2 concepts like nodes, topics, services, and actions.

Setting up a Basic Robot Controller

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan

class BasicRobotController(Node):
def __init__(self):
super().__init__('basic_robot_controller')

# Create publisher for robot movement commands
self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)

# Create subscriber for laser scan data
self.scan_subscription = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10)

# Timer for control loop
self.timer = self.create_timer(0.1, self.control_loop)

self.obstacle_detected = False

def scan_callback(self, msg):
# Check for obstacles in front of the robot
min_distance = min(msg.ranges)
self.obstacle_detected = min_distance < 1.0 # 1 meter threshold

def control_loop(self):
msg = Twist()

if self.obstacle_detected:
# Stop the robot if obstacle detected
msg.linear.x = 0.0
msg.angular.z = 0.5 # Turn to avoid obstacle
else:
# Move forward
msg.linear.x = 0.5
msg.angular.z = 0.0

self.cmd_vel_publisher.publish(msg)

Launching the Node

To run the node, create a main function:

def main(args=None):
rclpy.init(args=args)
robot_controller = BasicRobotController()
rclpy.spin(robot_controller)
robot_controller.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Basic Robot Control Workflow

1. Environment Setup

Before implementing robot control, ensure your ROS 2 environment is properly configured:

  • Source your ROS 2 installation: source /opt/ros/humble/setup.bash
  • Create a workspace: mkdir -p ~/ros2_ws/src
  • Build your packages: colcon build --packages-select your_package_name

2. Node Design

Design your control system with clear separation of concerns:

  • Sensor processing nodes
  • Decision-making nodes
  • Actuator command nodes
  • State monitoring nodes

3. Communication Architecture

Plan your topics, services, and actions carefully:

  • Use appropriate message types for your robot's sensors and actuators
  • Design custom message types when needed
  • Consider QoS settings for real-time requirements

4. Control Loop Implementation

Implement a robust control loop that:

  • Processes sensor data at appropriate frequencies
  • Makes decisions based on current state
  • Sends commands to actuators
  • Handles error conditions gracefully

5. Testing and Validation

Test your robot control system:

  • Use simulation environments first
  • Validate individual components before integration
  • Implement safety mechanisms to prevent hardware damage

Practical Example: Simple Navigation Node

Here's a complete example of a simple navigation node that moves a robot toward a goal while avoiding obstacles:

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, Point
from sensor_msgs.msg import LaserScan
import math

class SimpleNavigator(Node):
def __init__(self):
super().__init__('simple_navigator')

self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
self.scan_subscription = self.create_subscription(
LaserScan, '/scan', self.scan_callback, 10)

self.goal_x = 5.0 # Goal position in x
self.goal_y = 5.0 # Goal position in y

self.current_x = 0.0
self.current_y = 0.0

self.timer = self.create_timer(0.1, self.navigation_loop)

def scan_callback(self, msg):
# Process laser scan to detect obstacles
pass # Implementation depends on specific sensor layout

def navigation_loop(self):
msg = Twist()

# Calculate distance to goal
dist_to_goal = math.sqrt(
(self.goal_x - self.current_x)**2 +
(self.goal_y - self.current_y)**2)

if dist_to_goal > 0.5: # If not at goal
# Move toward goal
msg.linear.x = min(0.5, dist_to_goal * 0.5)
msg.angular.z = 0.0 # Simplified navigation
else:
# At goal, stop
msg.linear.x = 0.0
msg.angular.z = 0.0

self.cmd_vel_publisher.publish(msg)

def main(args=None):
rclpy.init(args=args)
navigator = SimpleNavigator()
rclpy.spin(navigator)
navigator.destroy_node()
rclpy.shutdown()

This example demonstrates the fundamental concepts of ROS 2 robot control, providing a foundation for more complex robotic applications.