Rosbag Helpers in the Artefacts Toolkit
The Artefacts Toolkit Rosbag Helpers provide convenient functions to create, record, and extract data from ROS bag files while running tests. These utilities help you capture topic data to be processed for analysis and visualisation.
Import with:
from artefacts_toolkit.rosbag import get_bag_recorder, image_topics, message_topics
Functions
- get_bag_recorder
- image_topics.extract_camera_image
- image_topics.extract_video
- message_topics.get_final_message
Function Reference
get_bag_recorder
Creates a rosbag2 recorder for a given list of topic names and returns the node and the filepath
rosbag.get_bag_recorder(
    topic_names,
    directory="rosbags",
    use_sim_time=False
)
Parameters
| Parameter | Type | Description | Default | 
|---|---|---|---|
| topic_names | list[str] | List of ROS topics to record | Required | 
| directory | str | Directory where the rosbag will be saved | "rosbags" | 
| use_sim_time | bool | Whether to use simulation time instead of system time | False | 
Returns
tuple: Returns a tuple containing:
- bag_recorder(- ExecuteProcess): Launch Action that runs the recorder process.
- rosbag_filepath(- str): Path to the rosbag file that will be created.
Note
- File format will be MCAP.
- Rosbag files are automatically named with a timestamp in the format "rosbag2_" + yyyymmddhhmmss.
- The bag_recorderprocess must be added to your LaunchDescription.
- Rosbags are automatically uploaded to the artefacts dashboard unless you use the --no uploadflag when usingartefacts run.
Example
The following example shows a test launch file using the bag_recorder helper to record a rosbag, add the bag_recorder to the launch description, and later use the rosbag_filepath for an assertion test (using another rosbag helper get_final_message):
@pytest.mark.launch_test
def generate_test_description():
    camera_topics = ["/depth_cam/rgb"]
    extra_topics = ["/odom", "/noisy_estimate"]
    bag_recorder, rosbag_filepath = rosbag.get_bag_recorder(
        camera_topics + extra_topics, use_sim_time=False
    )
    test_odometry_node = ExecuteProcess(
        cmd=[
            "python3",
            "src/test_odometry_node.py",
        ]
    )
    return LaunchDescription(
        [
            test_odometry_node,
            launch_testing.actions.ReadyToTest(),
            bag_recorder,
        ]
    ), {
        "test_odometry_node": test_odometry_node,
        "rosbag_filepath": rosbag_filepath,
    }
@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_end_position(self, rosbag_filepath):
        final_distance_from_start = message_topics.get_final_message(
            rosbag_filepath, "/distance_from_start.data"
        )
        assert final_distance_from_start < 0.1, (
            f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
        )
image_topics.extract_camera_image
Returns the last recorded image from a provided camera topic.
image_topics.extract_camera_image(
    rosbag_file_path,
    camera_topic,
    output_dir="output"
)
Parameters
| Parameter | Type | Description | Default | 
|---|---|---|---|
| rosbag_file_path | str | Path to the recorded rosbag | Required | 
| camera_topic | str | Name of the ROS camera topic to take the image from | Required | 
| output_dir | str | Directory where to save the extracted image to | "output" | 
Returns
None: The image will be saved to the output_dir specified.
Note
- Images are saved in the .pngformat
- By setting output_dirto the same directory asoutput_dirsin your artefacts.yaml file, the image will be automatically uploaded to the artefacts dashboard after conclusion of the test.
Example
The following example shows how to extract the last camera image from the rosbag after the test has concluded. We use the rosbag_filepath returned by the rosbag.get_rosbag_recorder function.
from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
    self, proc_info, test_odometry_node, rosbag_filepath
):
    ...
    image_topics.extract_camera_image(rosbag_filepath, "/depth_cam/rgb")
image_topics.extract_video
Creates a WebM video by combining all images from a provided camera topic.
image_topics.extract_video(
    bag_path,
    topic_name,
    output_path,
    frame_rate=20
)
Parameters
| Parameter | Type | Description | Default | 
|---|---|---|---|
| bag_path | str | Path to the recorded rosbag | Required | 
| topic_name | str | Name of the ROS camera topic to create the video from | Required | 
| output_path | str | Path where the video will be saved (.webm) | Required | 
| frame_rate | int | Frame rate to use for the created video | 20 | 
Returns
None: The video file will be saved to the output_path specified.
Notes
Note
- Videos will be saved in WebM format. It will be automatically set if not present in output_path
- By saving to a directory specified in output_dirsin your artefacts.yaml file, the video will be automatically uploaded to the artefacts dashboard.
Example
The following example shows how to extract a video from the rosbag after the test has concluded. We use the rosbag_filepath returned by the rosbag.get_rosbag_recorder function.
from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
    self, proc_info, test_odometry_node, rosbag_filepath
):
    ...
    image_topics.extract_video(rosbag_filepath, "/depth_cam/rgb", "output/depth_cam.webm")
message_topics.get_final_message
Retrieves the final message from a specified topic in a rosbag, with optional attribute access using dot notation.
message_topics.get_final_message(
    rosbag_filepath,
    topic
)
Parameters
| Parameter | Type | Description | Default | 
|---|---|---|---|
| rosbag_filepath | str | Path to the recorded rosbag | Required | 
| topic | str | Topic name. Use dot notation to drill down message attributes (e.g., “/distance.data”) | Required | 
Returns
Any:  Returns the value of the specified message attribute. The type depends on the accessed field.
Note
- Supports attribute access using dot notation (e.g., “/distance.data”)
Example
We will use the same example from the rosbag.get_bag_recorder explanation. Note we get the data from the distance_from_start topic in order to make our assertion test.
@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_end_position(self, rosbag_filepath):
        final_distance_from_start = message_topics.get_final_message(
            rosbag_filepath, "/distance_from_start.data"
        )
        assert final_distance_from_start < 0.1, (
            f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
        )