This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

The Artefacts Toolkit (beta)

Using the Artefacts Toolkit

The Artefacts Toolkit is a Python package designed to improve developer productivity. It provides a collection of helper functions that simplify common testing tasks and integrate with the Artefacts platform.

The toolkit is organized into the following modules:

Installation

pip install artefacts-toolkit

Example Projects using the Artefacts Toolkit:

  • Nav2 - An example project using the Nav2 navigation stack and Gazebo.

1 - Configuration Helpers in the Artefacts Toolkit

The Artefacts Toolkit Configuration Helpers are functions that allow you to interact with configurations you have set in your artefacts.yaml file.

Import with:

from artefacts_toolkit.config import get_artefacts_param

Functions

Function Reference

get_artefacts_param

Returns a parameter set in the artefacts.yaml file. If param_type is set to launch, it will be returned as type string so that it can be used as a ROS launch argument.

This function is particularly useful for parametric testing. When you define a list of parameter values in your artefacts.yaml file (e.g., launch/world: ["empty.sdf", "bookstore.sdf", "restaurant.sdf"]), the artefacts run command will execute your test multiple times - once for each value in the list. During each test execution, get_artefacts_param("launch", "world") will automatically return the current parameter value for that specific test run.

This becomes especially powerful for grid-based testing! With three parameters that each have three possible values, Artefacts will automatically execute your test 27 times (3 × 3 × 3) while your launch file code remains unchanged.

get_artefacts_param(
    param_type,
    param_name,
    default=None,
    is_ros=True
)

Parameters

Parameter Type Description Default
param_type str The namespace/category of the parameter (e.g., “launch” in “launch/world”) Required
param_name str The specific parameter name (e.g., “world” in “launch/world”) Required
default any Value to return if the parameter isn’t found in artefacts.yaml. None
is_ros bool Whether the parameter should be converted to a ROS parameter format True

Returns

The function returns the parameter value with the following behavior:

  • If param_type is "launch" and is_ros is True: Returns the value as a str, regardless of its original type in the artefacts.yaml file. This is so that it can be used as a ros launch argument.
  • if default is set to anything other than None, then the value will be returned if artefacts is unable to find the requested parameter. This can be useful when (for example) using launch_test instead of artefacts run but you do not wish to make any code changes. It will also prevent KeyError exceptions.
  • For all other cases: Returns the value with its original type from the YAML file (e.g., list, dict, int, float, str, etc.)

Example

Given an artefacts.yaml file with the following configuration set:

scenarios:
  defaults:
    output_dirs: ["output"]
    metrics:
        - /odometry_error
    params:
      launch/world: ["bookstore.sdf", "empty.sdf"]

Getting the parameter in our test launch file like so:

def generate_test_description():
    try:
        world = get_artefacts_param("launch", "world")
    except FileNotFoundError:
        world = "empty.world"

    run_headless = LaunchConfiguration("run_headless")
    launch_navigation_stack = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            [
                os.path.join(
                    get_package_share_directory("sam_bot_nav2_gz"),
                    "launch",
                    "complete_navigation.launch.py"
                ),
            ]
        ),
        launch_arguments=[("run_headless", run_headless), ("world_file", world)],
    )

...# Rest of launch test file   

This will run the same test twice - once in an empty world and once in a bookstore world - without any changes to your launch file code between runs.

2 - Chart Helpers in the Artefacts Toolkit

The Artefacts Toolkit Chart helpers are designed to help you with visualising data received from topics while running tests.

Import with:

from artefacts_toolkit.chart import make_chart

Functions

Function Reference

make_chart

Creates an interactive HTML chart based on data from two provided topics.

make_chart(
    filepath,
    topic_x,
    topic_y,
    field_unit=None,
    output_dir="output",
    chart_name="chart",
    file_type="rosbag"
)

Parameters

Parameter Type Description Default
filepath str Path to the data file (rosbag) Required
topic_x str Topic name for x-axis. Use “Time” to plot against time Required
topic_y str Topic name for y-axis. Use “Time” to plot against time Required
field_unit str Unit of measurement for the field data (e.g., “m/s”, “rad”) None
output_dir str Directory where the chart will be saved "output"
chart_name str Name of the generated chart file "chart"
file_type str Type of data file. Currently supports “rosbag” "rosbag"

Returns

None: Creates an html chart <chart_name>.html at output_dir but doesn’t return any value.

Example

The following example adds the make_chart function post shutdown, i.e after the test has completed and rosbag saved.

# my_test_file.launch.py
# test code

...

@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_exit_code(self, rosbag_filepath):
        make_chart(
            rosbag_filepath,
            "/odom.pose.pose.position.x",
            "/odom.pose.pose.position.y",
            field_unit="m",
            chart_name="odometry_position",
        )

Producing the following chart:

Example Chart

3 - Gazebo Helpers in the Artefacts Toolkit

The Artefacts Toolkit Gazebo Helpers provide convenient functions to interact with your Gazebo simulations while running tests. These utilities allow you to inspect simulation objects, access model positions, as well as bridge topics between Gazebo and ROS2.

Import with:

from artefacts_toolkit.gazebo import bridge, gz

Functions

Function Reference

bridge.get_camera_bridge

Creates a gazebo / ros2 topic bridge for a camera topic

bridge.get_camera_bridge(
    topic_name,
    condition=None
)

Parameters

Parameter Type Description Default
topic_name str The camera topic name to bridge from Gazebo to ROS2 Required
condition str Optional launch condition that determins when this bridge should be created None

Returns

Node: Returns a ROS 2 Node object that runs the parameter_bridge for the specified camera topic. This node can be included in your launch description.

Example

The following example shows a camera bridge being created conditionally based on a launch argument. When record_video is set to “true”, the bridge will be activated, allowing ROS 2 nodes to subscribe to camera images from Gazebo. The returned Node is added to the LaunchDescription to be included in the launch process.

@pytest.mark.launch_test
def generate_test_description():
    from artefacts_toolkit.gazebo import bridge
    ...

    camera_topic = "/observation_camera/image"
    bag_recorder, rosbag_filepath = get_bag_recorder([camera_topic])
    sim = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(["bringup", "/robot.launch.py"])
    )

    record_video_launch_arg = DeclareLaunchArgument(
        "record_video", default_value="true"
    )
    record_video = LaunchConfiguration("record_video")
    camera_bridge = bridge.get_camera_bridge(camera_topic, condition=IfCondition(record_video))
    
    return LaunchDescription(
        [
            record_video_launch_arg,
            sim,
            camera_bridge,
            controller_process,
            launch_testing.actions.ReadyToTest(),
        ]
    )

gz.get_sim_objects

Extracts model information from a Gazebo world file by parsing its XML structure. This function returns the name and original pose of all models defined in the world file.

This can useful when you need to know the initial poses of models as defined in the world file, rather than querying the running simulation (where positions might have changed due to physics, randomness, or interactions).

gz.get_sim_objects(world_file)

Parameters

Parameter Type Description Default
world_file str Path to the Gazebo world file to parse Required

Returns

tuple: The function returns a tuple with two values that should be unpacked:

  • A list of dictionaries:
objects = [
    {
        "name": "model_1",
        "pose": "0 0 0 0 0 0"
    }, 
    {
        "name": "model_2",
        "pose": "1 2 3 0 0 0"
    },
    ...
]
  • A dictionary mapping model names to poses:
objects_positions = {
    "model_1": "0 0 0 0 0 0", 
    "model_2": "1 2 3 0 0 0",
    ...
}

Example

# Working with the objects_position dict
_, objects_positions = gz.get_sim_objects("world.sdf")
model_pose = objects_positions["model_1"]  # "0 0 0 0 0 0"

# Working with the objects list
objects, _ = gz.get_sim_objects("world.sdf")
for model in objects:
    print(f"{model['name']} is at position {model['pose']}")

gz.get_model_location

Gets the current (x, y, z) position of a model in a running Gazebo simulation. Unlike get_sim_objects which reads original positions from a world file, this function queries the live simulation to get the model’s current position.

gz.get_model_location(model_name)

Parameters

Parameter Type Description Default
model_name str Name of the model to query in the simulation Required

Returns

tuple: Returns a tuple of three float values representing the (x, y, z) position in meters:

(x_position, y_position, z_position)

Example

The example below demonstrates how to combine get_sim_objects and get_model_location to validate a pick-and-place task. It shows how to:

  1. Get initial positions from the world file
  2. Wait for a robot to complete its task
  3. Compare initial positions with current positions to detect which objects were moved
class TestProcOutput(unittest.TestCase):
    def test_moved_meatballs(self, proc_output, controller_process):
        # Original locations
        sim_objects, sim_objects_positions = gz.get_sim_objects("worlds/env.sdf") # this includes models poses
        meatball_models = [obj["name"] for obj in sim_objects if "karaage" in obj["name"]]
        #  Wait for the control loop to finish
        proc_output.assertWaitFor("Done with tasks execution", timeout=300)
        picked_meatballs = 0 # karaage moved for more than 10cm
        for meatball in meatball_models:
            x, y, z = gz.get_model_location(meatball)
            x_original, y_original, z_original = sim_objects_positions[meatball]
            dist = ((x - x_original) ** 2 + (y - y_original) ** 2 + (z - z_original) ** 2) ** 0.5
            if dist > 0.1: #10cm
                # Likely to have been picked and moved somewhere else
                picked_meatballs += 1
        self.assertEqual(picked_meatballs, 4)

gz.kill_gazebo

Kills the currently running gazebo process.

gz.kill_gazebo()

Returns

None: This function doesn’t return any value.

Example

from artefacts_toolkit.gazebo import gz
...

@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_exit_code(self, proc_info, controller_process, rosbag_filepath):
        gz.kill_gazebo()
        ...

4 - Rosbag Helpers in the Artefacts Toolkit

The Artefacts Toolkit Rosbag Helpers provide convenient functions to create, record, and extract data from ROS bag files while running tests. These utilities help you capture topic data to be processed for analysis and visualisation.

Import with:

from artefacts_toolkit.rosbag import get_bag_recorder, image_topics, message_topics

Functions

Function Reference

get_bag_recorder

Creates a rosbag2 recorder for a given list of topic names and returns the node and the filepath

rosbag.get_bag_recorder(
    topic_names,
    directory="rosbags",
    use_sim_time=False
)

Parameters

Parameter Type Description Default
topic_names list[str] List of ROS topics to record Required
directory str Directory where the rosbag will be saved "rosbags"
use_sim_time bool Whether to use simulation time instead of system time False

Returns

tuple: Returns a tuple containing:

  • bag_recorder(ExecuteProcess): Launch Action that runs the recorder process.
  • rosbag_filepath(str): Path to the rosbag file that will be created.

Example

The following example shows a test launch file using the bag_recorder helper to record a rosbag, add the bag_recorder to the launch description, and later use the rosbag_filepath for an assertion test (using another rosbag helper get_final_message):

@pytest.mark.launch_test
def generate_test_description():
    camera_topics = ["/depth_cam/rgb"]
    extra_topics = ["/odom", "/noisy_estimate"]

    bag_recorder, rosbag_filepath = rosbag.get_bag_recorder(
        camera_topics + extra_topics, use_sim_time=False
    )

    test_odometry_node = ExecuteProcess(
        cmd=[
            "python3",
            "src/test_odometry_node.py",
        ]
    )
    return LaunchDescription(
        [
            test_odometry_node,
            launch_testing.actions.ReadyToTest(),
            bag_recorder,
        ]
    ), {
        "test_odometry_node": test_odometry_node,
        "rosbag_filepath": rosbag_filepath,
    }


@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_end_position(self, rosbag_filepath):
        final_distance_from_start = message_topics.get_final_message(
            rosbag_filepath, "/distance_from_start.data"
        )

        assert final_distance_from_start < 0.1, (
            f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
        )

image_topics.extract_camera_image

Returns the last recorded image from a provided camera topic.

image_topics.extract_camera_image(
    rosbag_file_path,
    camera_topic,
    output_dir="output"
)

Parameters

Parameter Type Description Default
rosbag_file_path str Path to the recorded rosbag Required
camera_topic str Name of the ROS camera topic to take the image from Required
output_dir str Directory where to save the extracted image to "output"

Returns

None: The image will be saved to the output_dir specified.

Example

The following example shows how to extract the last camera image from the rosbag after the test has concluded. We use the rosbag_filepath returned by the rosbag.get_rosbag_recorder function.

from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
    self, proc_info, test_odometry_node, rosbag_filepath
):
    ...

    image_topics.extract_camera_image(rosbag_filepath, "/depth_cam/rgb")

image_topics.extract_video

Creates a WebM video by combining all images from a provided camera topic.

image_topics.extract_video(
    bag_path,
    topic_name,
    output_path,
    frame_rate=20
)

Parameters

Parameter Type Description Default
bag_path str Path to the recorded rosbag Required
topic_name str Name of the ROS camera topic to create the video from Required
output_path str Path where the video will be saved (.webm) Required
frame_rate int Frame rate to use for the created video 20

Returns

None: The video file will be saved to the output_path specified.

Notes

Example

The following example shows how to extract a video from the rosbag after the test has concluded. We use the rosbag_filepath returned by the rosbag.get_rosbag_recorder function.

from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
    self, proc_info, test_odometry_node, rosbag_filepath
):
    ...

    image_topics.extract_video(rosbag_filepath, "/depth_cam/rgb", "output/depth_cam.webm")

message_topics.get_final_message

Retrieves the final message from a specified topic in a rosbag, with optional attribute access using dot notation.

message_topics.get_final_message(
    rosbag_filepath,
    topic
)

Parameters

Parameter Type Description Default
rosbag_filepath str Path to the recorded rosbag Required
topic str Topic name. Use dot notation to drill down message attributes (e.g., “/distance.data”) Required

Returns

Any: Returns the value of the specified message attribute. The type depends on the accessed field.

Example

We will use the same example from the rosbag.get_bag_recorder explanation. Note we get the data from the distance_from_start topic in order to make our assertion test.

@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_end_position(self, rosbag_filepath):
        final_distance_from_start = message_topics.get_final_message(
            rosbag_filepath, "/distance_from_start.data"
        )

        assert final_distance_from_start < 0.1, (
            f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
        )