Simulation Interfaces#

Simulation Interfaces Test Script#

In this module, we will analyze a script that leverages service interfaces to orchestrate a multi-robot workflow. We will walk through the code in detail, uncommenting sections as we go, then finally run it.

Setup#

  1. In Isaac Sim, open the warehouse test scene from the course assets folder: Starting_Point/warehouse_test_scene.usd

  2. In your code-editor, open the script in /sim_control_script_and_carter_ws/learning_simulation_control/workshop_learning_script.py

The carter1_sequence#

We are now working on the carter1_sequence function on our workshop script.

This sequence includes the following steps:

  1. Drive the Nav2-controlled Carter to the table near a cube.

  2. Spawn and position a Limo pusher if needed.

  3. Limo “nudges” the cube off the table.

  4. Reset and randomize the cube location, repeat the routine again.

Let’s take a closer look at the code together. Search for the function carter1_sequence

The robot is initially placed at [-4, -4, 0] on the warehouse map:

1# Set default initial pose if not provided
2if initial_pose is None:
3    initial_pose = [(-4.0, -4.0, 0.0), yaw_to_quaternion(0.0)]

The drop off zone is located at [-3.5, -4, 0], like you have seen in the Isaac Sim scene. We also set up the prim paths for the table and limo robot. Note the namespace for the limo robot. One limo robot is setup for each carter sequence.

1    # dropOff_pose position
2    # Format: [(x, y, z), quaternion_tuple]
3    carter1_dropOff_pose = [(-3.5, -4.0, 0.0), yaw_to_quaternion(-1.5708)]
4
5    demo_table_path = "/World/Demo_table_1/thor_table"
6    demo_cube_path = "/World/Demo_table_1/nvidia_cube"
7
8    limo_namespace = f"{robot_name}_limo"
9    limo_name = f"/World/{limo_namespace}"

We are now initializing the main object of this workshop, the TestContext, that will be analyzed in detail after the carter1_sequence initialization.

1test_robot_context = TestContext(robot_name, initial_pose)

This section initializes the robot context and reads the table position.

 1# Grab the table position
 2success, table_state = await test_robot_context.get_entity_state(demo_table_path)
 3if not success:
 4    print(f"{robot_name}: Failed to get table state: {table_state}")
 5    return f"{robot_name} failed: Could not get table state"
 6print(f"{robot_name}: Table at {table_state.pose.position.x:.2f}, {table_state.pose.position.y:.2f}")
 7
 8table_x = table_state.pose.position.x
 9table_y = table_state.pose.position.y
10table_z = table_state.pose.position.z
11
12limo_rest_quat_theta = yaw_to_quaternion(0.0)
13limo_rest_pose = [(table_x-0.3, table_y-0.5, table_z+0.15), limo_rest_quat_theta]

Find the code under WORKSHOP 01 and uncomment that section.

The TestContext class#

Let’s analyze another key component of this script, the TestContext class.

TestContext is a small ROS 2 helper class that wraps the simulation service clients (get/set entity state, spawn, play/pause sim) and exposes convenient async methods that return (success, payload_or_error).

The clients are created in (e.g., “get_entity_state”, “spawn_entity”, “set_simulation_state”, etc.). Below you can see the list of all services this node uses to spawn, and control the simulation state.

1self.get_entity_service = self.node.create_client(GetEntityState, "get_entity_state")
2self.set_entity_service = self.node.create_client(SetEntityState, "set_entity_state")
3self.spawn_entity_service = self.node.create_client(SpawnEntity, "spawn_entity")
4self.set_simulation_state_service = self.node.create_client(SetSimulationState, "set_simulation_state")
5self.reset_simulation_service = self.node.create_client(ResetSimulation, "reset_simulation")
6self.get_entity_info_service = self.node.create_client(GetEntityInfo, "get_entity_info")
7self.publish_cmd_vel = None # to be set when the robot is spawned

Find the code under WORKSHOP TC-01 and uncomment that section.

The get_entity_info method#

Queries metadata about an entity via the GetEntityInfo service. This metadata could be:

  1. Does the entity exist?

  2. What type is it? Note, in Isaac Sim 5.1 we only return CATEGORY_OBJECT so far.

    Returns (True, info) on success or (False, error_message) on failure.

    Internally: builds a GetEntityInfo.Request, calls the async client, spins until the future completes, and checks Result.RESULT_OK.

Tip

To learn more about the unique simulation interface Result, see the msg type here.

 1req = GetEntityInfo.Request()
 2req.entity = entity_path
 3future = self.get_entity_info_service.call_async(req)
 4
 5while not future.done():
 6    rclpy.spin_once(self.node, timeout_sec=0.1)
 7    await asyncio.sleep(0.01)
 8
 9if future.result() is not None:
10    result = future.result()
11    if result.result.result == Result.RESULT_OK:
12        return True, result.info
13    else:
14        error_msg = result.result.error_message or f"Failed to get entity info (code: {result.result.result})"
15        print(f"Error getting entity info for {entity_path}: {error_msg}")
16        return False, error_msg

Find the code under WORKSHOP TC-02 and uncomment that section.

The get_entity_state method#

Reads the current EntityState (pose + twist) for the given path via GetEntityState. Returns (True, state) or (False, error_message)

 1req = GetEntityState.Request()
 2req.entity = entity_path
 3future = self.get_entity_service.call_async(req)
 4
 5while not future.done():
 6    rclpy.spin_once(self.node, timeout_sec=0.1)
 7    await asyncio.sleep(0.01)
 8
 9if future.result() is not None:
10    result = future.result()
11    if result.result.result == Result.RESULT_OK:
12        return True, result.state
13    else:
14        error_msg = result.result.error_message or f"Failed to get entity state (code: {result.result.result})"
15        print(f"Error getting entity state for {entity_path}: {error_msg}")
16        return False, error_msg

Find the code under WORKSHOP TC-03 and uncomment that section.

The set_simulation_state method#

Using the SetSimulationState service, this method switches the simulation between states, such as

  • SimulationState.STATE_PLAYING

  • SimulationState.STATE_STOPPED

    Returns (True, None) on success or (False, error_message) on failure.

 1req = SetSimulationState.Request()
 2req.state.state = state
 3future = self.set_simulation_state_service.call_async(req)
 4
 5while not future.done():
 6    rclpy.spin_once(self.node, timeout_sec=0.1)
 7    await asyncio.sleep(0.01)
 8
 9if future.result() is not None:
10    result = future.result()
11    if result.result.result == Result.RESULT_OK:
12        return True, None
13    else:
14        error_msg = result.result.error_message or f"Failed to set simulation state (code: {result.result.result})"
15        print(f"Error setting simulation state: {error_msg}")
16        return False, error_msg

Find the code under WORKSHOP TC-04 and uncomment that section.

The spawn_entity method#

Spawns a new entity from a fully-formed SpawnEntity.Request. Returns (True, full_response, result_code) on success, or (False, error_message, result_code) on failure. (Convenience builder below: create_spawn_entity_request.)

 1future = self.spawn_entity_service.call_async(spawn_request)
 2
 3while not future.done():
 4    rclpy.spin_once(self.node, timeout_sec=0.1)
 5    await asyncio.sleep(0.01)
 6
 7result = future.result()
 8if result:
 9    if result.result.result == Result.RESULT_OK:
10        print(f"Successfully spawned entity: {spawn_request.name}")
11        return True, result, result.result.result
12    else:
13        error_msg = result.result.error_message or f"Failed to spawn entity (code: {result.result.result})"
14        print(f"Error spawning entity {spawn_request.name}: {error_msg}")
15        return False, error_msg, result.result.result

Find the code under WORKSHOP TC-05 and uncomment that section.

The create_spawn_entity_request method#

Convenience builder that returns a correctly structured SpawnEntity.Request (name, USD URI, PoseStamped with frame, position, quaternion, optional namespace). Handy to avoid boilerplate and to keep pose fields as plain tuples.

 1spawn_request.name = name
 2spawn_request.uri = uri
 3spawn_request.allow_renaming = allow_renaming
 4
 5# Create pose stamped
 6spawn_request.initial_pose = PoseStamped()
 7spawn_request.initial_pose.header.frame_id = frame_id
 8spawn_request.initial_pose.pose.position.x = float(position[0])
 9spawn_request.initial_pose.pose.position.y = float(position[1])
10spawn_request.initial_pose.pose.position.z = float(position[2])
11spawn_request.initial_pose.pose.orientation.w = float(orientation[0])
12spawn_request.initial_pose.pose.orientation.x = float(orientation[1])
13spawn_request.initial_pose.pose.orientation.y = float(orientation[2])
14spawn_request.initial_pose.pose.orientation.z = float(orientation[3])
15
16# Set namespace if provided
17if entity_namespace:
18    spawn_request.entity_namespace = entity_namespace

Find the code under WORKSHOP TC-06 and uncomment that section.

The set_entity_state method#

Writes an entity’s pose and (optionally) a linear/angular velocity using SetEntityState. You can either pass a ready EntityState message or let the method build one from tuples. Returns (True, None) on success or (False, error_message)

 1spawn_request.name = name
 2spawn_request.uri = uri
 3spawn_request.allow_renaming = allow_renaming
 4
 5# Create pose stamped
 6spawn_request.initial_pose = PoseStamped()
 7spawn_request.initial_pose.header.frame_id = frame_id
 8spawn_request.initial_pose.pose.position.x = float(position[0])
 9spawn_request.initial_pose.pose.position.y = float(position[1])
10spawn_request.initial_pose.pose.position.z = float(position[2])
11spawn_request.initial_pose.pose.orientation.w = float(orientation[0])
12spawn_request.initial_pose.pose.orientation.x = float(orientation[1])
13spawn_request.initial_pose.pose.orientation.y = float(orientation[2])
14spawn_request.initial_pose.pose.orientation.z = float(orientation[3])
15
16# Set namespace if provided
17if entity_namespace:
18    spawn_request.entity_namespace = entity_namespace

Find the code under WORKSHOP TC-07 and uncomment that section.

Implementing the robot sequence#

Now we are inside the main, looping function for the carter1 sequence simulation. The routine runs two full trials to demonstrate repeatability and reset logic.

Overview#

This code details how to fetch and monitor the cube’s pose, compute navigation waypoints, control the Limo robot to push the cube, reset both entities, and randomize cube position for iterative tabletop trials. If this fails, the sequence aborts with a message.

Spawn the World in Isaac Sim#

1req = LoadWorld.Request()
2req.uri = os.path.join(ROSCON25_ASSET_PATH, "Starting_Point/Empty_Warehouse_Scene.usd")
3future = load_world_client.call_async(req)
4rclpy.spin_until_future_complete(node, future)
5
6if future.result() and future.result().result.result == Result.RESULT_OK:
7    print("World loaded successfully")
8else:
9    print("Failed to load world")

Find the code under WORKSHOP 04 and uncomment that section.

This section builds a fresh SpawnEntity.Request with:

  • full path: /World/Demo_table_1/thor_table,

  • frame_id : world

  • position: (-4.72, 0.688, 1.19)

  • quaternion: identity

  • allow_renaming=False

    Calls spawn + checks result

 1table_req = SpawnEntity.Request()
 2table_req.name = "/World/Demo_table_1/thor_table"
 3table_req.uri = os.path.join(ROSCON25_ASSET_PATH, "thor_table/thor_table.usd")
 4table_req.allow_renaming = False
 5table_req.initial_pose = PoseStamped()
 6table_req.initial_pose.header.frame_id = "world"
 7table_req.initial_pose.pose.position.x = float(-4.72)
 8table_req.initial_pose.pose.position.y = float(0.688)
 9table_req.initial_pose.pose.position.z = float(1.19)
10table_req.initial_pose.pose.orientation.w = float(1.0)
11table_req.initial_pose.pose.orientation.x = float(0.0)
12table_req.initial_pose.pose.orientation.y = float(0.0)
13table_req.initial_pose.pose.orientation.z = float(0.0)
14future = spawn_entity_client.call_async(table_req)
15rclpy.spin_until_future_complete(node, future)
16
17if future.result() and future.result().result.result == Result.RESULT_OK:
18    print("Table spawned successfully")
19else:
20    print("Failed to spawn table")

Find the code WORKSHOP 05 and uncomment. This section spawns the nvidia cube at /World/Demo_table_1/nvidia_cube with:

  • full path: /World/Demo_table_1/nvidia_cube,

  • frame_id : world

  • Position (partially reused): (-4.72+0.15, 0.688, 1.29)

  • Quaternion (reused)

  • allow_renaming=True (because we can spawn multiple cubes without having to worry about naming manually)

The spawn pose is set in world coordinates, and the result is confirmed and printed.

 1table_req.name = "/World/Demo_table_1/nvidia_cube"
 2table_req.uri = os.path.join(ROSCON25_ASSET_PATH, "nvidia_cube/nvidia_cube.usd")
 3table_req.allow_renaming = True
 4table_req.initial_pose.header.frame_id = "world"
 5table_req.initial_pose.pose.position.x += 0.15
 6table_req.initial_pose.pose.position.z = float(1.29)
 7future = spawn_entity_client.call_async(table_req)
 8rclpy.spin_until_future_complete(node, future)
 9
10if future.result() and future.result().result.result == Result.RESULT_OK:
11    print("Cube spawned successfully")
12else:
13    print("Failed to spawn cube")

Find the code WORKSHOP 06 and uncomment.

This section spawns Nova Carter at /World/nova_carter_ROS_1 with:

  • entity_namespace=”carter1”

  • full path: /World/nova_carter_ROS_1,

  • frame_id : world

  • Position: (-4.0, -4.0, 0.0)

  • Quaternion: yaw of +90° using yaw_to_quaternion(1.5708).

  • allow_renaming=False (because we only intend to one Nova Carter spawned at this time) The spawn pose is set in world coordinates, and the result is confirmed and printed.

 1carter1_req = SpawnEntity.Request()
 2carter1_req.name = "/World/nova_carter_ROS_1"
 3carter1_req.uri = os.path.join(ROSCON25_ASSET_PATH, "Starting_Point/nova_carter_ROS.usd")
 4carter1_req.entity_namespace = "carter1"
 5carter1_req.allow_renaming = False
 6carter1_req.initial_pose = PoseStamped()
 7carter1_req.initial_pose.header.frame_id = "world"
 8carter1_req.initial_pose.pose.position.x = float(-4.0)
 9carter1_req.initial_pose.pose.position.y = float(-4.0)
10carter1_req.initial_pose.pose.position.z = float(0.0)
11quat = yaw_to_quaternion(1.5708)
12carter1_req.initial_pose.pose.orientation.w = float(quat[0])
13carter1_req.initial_pose.pose.orientation.x = float(quat[1])
14carter1_req.initial_pose.pose.orientation.y = float(quat[2])
15carter1_req.initial_pose.pose.orientation.z = float(quat[3])
16future = spawn_entity_client.call_async(carter1_req)
17rclpy.spin_until_future_complete(node, future)
18
19if future.result() and future.result().result.result == Result.RESULT_OK:
20    print("Carter1 spawned successfully")
21else:
22    print("Failed to spawn Carter1")

Find the code WORKSHOP 07 and uncomment.

Now, in this section, we will set the simulation state to SimulationState.STATE_PLAYING, spin ROS functions, and introduce a small delay to let simulation physics stacks settle.

1req = SetSimulationState.Request()
2req.state.state = SimulationState.STATE_PLAYING
3future = set_state_client.call_async(req)
4rclpy.spin_until_future_complete(node, future)
5
6if future.result() and future.result().result.result == Result.RESULT_OK:
7    print("Simulation playing successfully")
8else:
9    print("Failed to play simulation")

Find the code WORKSHOP 08 and uncomment.

When we stop the simulation, we set the state to SimulationState.STATE_STOPPED, spin ROS functions, and introduce a small delay to let simulation settle. This is essentially a clean pause before teardown.

1req = SetSimulationState.Request()
2req.state.state = SimulationState.STATE_STOPPED
3future = set_state_client.call_async(req)
4rclpy.spin_until_future_complete(node, future)
5
6if future.result() and future.result().result.result == Result.RESULT_OK:
7    print("Simulation is now stopped")
8else:
9    print("Failed to stop simulation")

Find the code WORKSHOP 09 and uncomment.

After that, we send a UnloadWorld.Request(), spin and wait for Isaac Sim to unload the world.

1req = UnloadWorld.Request()
2future = unload_world_client.call_async(req)
3rclpy.spin_until_future_complete(node, future)
4
5if future.result() and future.result().result.result == Result.RESULT_OK:
6    print("World unloaded successfully")
7else:
8    print("Failed to unload world")

Find the code WORKSHOP 10 and uncomment.

Testing the Script#

Now that we’ve analyzed the code in our script, let’s try it out.

Follow the instructions below to run the Nav2 stack:

Running the Nav Stack With the Test Script#

  1. Open a new terminal and source your ROS workspace:

source sim_control_script_and_carter_ws/ros_ws/install/setup.bash

Ensure that the ROS_DOMAIN_ID environment variable is set.

export ROS_DOMAIN_ID=42 # Replace number with your own station number
  1. Start Nav2 for multi robot simulation for both Nova Carters by running the following command:

ros2 launch carter_navigation multiple_robot_carter_navigation_warehouse.launch.py
  1. From a ROS-sourced terminal set your ROS_DOMAIN_ID and run the script:

export ROS_DOMAIN_ID=42 # Replace number with your own station number
# cd into the course materials folder
python3 sim_control_script_and_carter_ws/learning_simulation_control/workshop_learning_script.py

Tip

If you need, the full output script from this module is found in the course materials undersim_control_script_and_carter_ws/learning_simulation_control/workshop_learning_script_final.py.

You may also run a similar command as above to run this script here:

export ROS_DOMAIN_ID=42 # Replace number with your own station number
# cd into the course materials folder
python3 sim_control_script_and_carter_ws/learning_simulation_control/workshop_learning_script_final.py

Tip

The backup robot file is located in the course assets under /Backup/nova_carter_ROS.usd.

If you need to use the backup robot USD file, copy and replace it into the Starting Point folder at: Starting_Point/nova_carter_ROS.usd.

Tip

If the Nav stack seems stuck, simply stop both the Nav2 stack and the current script command (CTRL+C) and re-run.