Skip to content

Rosbag Helpers in the Artefacts Toolkit

The Artefacts Toolkit Rosbag Helpers provide convenient functions to create, record, and extract data from ROS bag files while running tests. These utilities help you capture topic data to be processed for analysis and visualisation.

Import with:

from artefacts_toolkit.rosbag import rosbag, image_topics, message_topics

Functions

Function Reference

rosbag.get_bag_recorder

Creates a rosbag2 recorder for a given list of topic names and returns the node and the filepath

rosbag.get_bag_recorder(
    topic_names,
    directory="rosbags",
    use_sim_time=False
)

Parameters

Parameter Type Description Default
topic_names list[str] List of ROS topics to record Required
directory str Directory where the rosbag will be saved "rosbags"
use_sim_time bool Whether to use simulation time instead of system time False

Returns

tuple: Returns a tuple containing:

  • bag_recorder(ExecuteProcess): Launch Action that runs the recorder process.
  • rosbag_filepath(str): Path to the rosbag file that will be created.

Notes

  • File format will be MCAP.
  • Rosbag files are automatically named with a timestamp in the format "rosbag2_" + yyyymmddhhmmss.
  • The bag_recorder process must be added to your LaunchDescription.
  • Rosbags are automatically uploaded to the artefacts dashboard unless you use the --no upload flag when using artefacts run.

Example

The following example shows a test launch file using the bag_recorder helper to record a rosbag, add the bag_recorder to the launch description, and later use the rosbag_filepath for an assertion test (using another rosbag helper get_final_message):

@pytest.mark.launch_test
def generate_test_description():
    camera_topics = ["/depth_cam/rgb"]
    extra_topics = ["/odom", "/noisy_estimate"]

    bag_recorder, rosbag_filepath = rosbag.get_bag_recorder(
        camera_topics + extra_topics, use_sim_time=False
    )

    test_odometry_node = ExecuteProcess(
        cmd=[
            "python3",
            "src/test_odometry_node.py",
        ]
    )
    return LaunchDescription(
        [
            test_odometry_node,
            launch_testing.actions.ReadyToTest(),
            bag_recorder,
        ]
    ), {
        "test_odometry_node": test_odometry_node,
        "rosbag_filepath": rosbag_filepath,
    }


@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_end_position(self, rosbag_filepath):
        final_distance_from_start = message_topics.get_final_message(
            rosbag_filepath, "/distance_from_start.data"
        )

        assert final_distance_from_start < 0.1, (
            f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
        )

image_topics.extract_camera_image

Returns the last recorded image from a provided camera topic.

image_topics.extract_camera_image(
    rosbag_file_path,
    camera_topic,
    output_dir="output"
)

Parameters

Parameter Type Description Default
rosbag_file_path str Path to the recorded rosbag Required
camera_topic str Name of the ROS camera topic to take the image from Required
output_dir str Directory where to save the extracted image to "output"

Returns

None: The image will be saved to the output_dir specified.

Notes

  • Images are saved in the .png format
  • By setting output_dir to the same directory as output_dirs in your artefacts.yaml file, the image will be automatically uploaded to the artefacts dashboard after conclusion of the test.

Example

The following example shows how to extract the last camera image from the rosbag after the test has concluded. We use the rosbag_filepath returned by the rosbag.get_rosbag_recorder function.

from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
    self, proc_info, test_odometry_node, rosbag_filepath
):
    ...

    image_topics.extract_camera_image(rosbag_filepath, "/depth_cam/rgb")

image_topics.extract_video

Creates a WebM video by combining all images from a provided camera topic.

image_topics.extract_video(
    bag_path,
    topic_name,
    output_path,
    frame_rate=20
)

Parameters

Parameter Type Description Default
bag_path str Path to the recorded rosbag Required
topic_name str Name of the ROS camera topic to create the video from Required
output_path str Path where the video will be saved (.webm) Required
frame_rate int Frame rate to use for the created video 20

Returns

None: The video file will be saved to the output_path specified.

Notes

  • Videos will be saved in WebM format. It will be automatically set if not present in output_path
  • By saving to a directory specified in output_dirs in your artefacts.yaml file, the video will be automatically uploaded to the artefacts dashboard.

Example

The following example shows how to extract a video from the rosbag after the test has concluded. We use the rosbag_filepath returned by the rosbag.get_rosbag_recorder function.

from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
    self, proc_info, test_odometry_node, rosbag_filepath
):
    ...

    image_topics.extract_video(rosbag_filepath, "/depth_cam/rgb", "output/depth_cam.webm")

message_topics.get_final_message

Retrieves the final message from a specified topic in a rosbag, with optional attribute access using dot notation.

message_topics.get_final_message(
    rosbag_filepath,
    topic
)

Parameters

Parameter Type Description Default
rosbag_filepath str Path to the recorded rosbag Required
topic str Topic name. Use dot notation to drill down message attributes (e.g., "/distance.data") Required

Returns

Any: Returns the value of the specified message attribute. The type depends on the accessed field.

Notes

  • Supports attribute access using dot notation (e.g., "/distance.data")

Example

We will use the same example from the rosbag.get_bag_recorder explanation. Note we get the data from the distance_from_start topic in order to make our assertion test.

@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_end_position(self, rosbag_filepath):
        final_distance_from_start = message_topics.get_final_message(
            rosbag_filepath, "/distance_from_start.data"
        )

        assert final_distance_from_start < 0.1, (
            f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
        )