Rosbag Helpers in the Artefacts Toolkit
The Artefacts Toolkit Rosbag Helpers provide convenient functions to create, record, and extract data from ROS bag files while running tests. These utilities help you capture topic data to be processed for analysis and visualisation.
Import with:
from artefacts_toolkit.rosbag import rosbag, image_topics, message_topics
Functions
rosbag.get_bag_recorder
image_topics.extract_camera_image
image_topics.extract_video
message_topics.get_final_message
Function Reference
rosbag.get_bag_recorder
Creates a rosbag2 recorder for a given list of topic names and returns the node and the filepath
rosbag.get_bag_recorder(
topic_names,
directory="rosbags",
use_sim_time=False
)
Parameters
Parameter | Type | Description | Default |
---|---|---|---|
topic_names |
list[str] |
List of ROS topics to record | Required |
directory |
str |
Directory where the rosbag will be saved | "rosbags" |
use_sim_time |
bool |
Whether to use simulation time instead of system time | False |
Returns
tuple
: Returns a tuple containing:
bag_recorder
(ExecuteProcess
): Launch Action that runs the recorder process.rosbag_filepath
(str
): Path to the rosbag file that will be created.
Notes
- File format will be MCAP.
- Rosbag files are automatically named with a timestamp in the format
"rosbag2_" + yyyymmddhhmmss
. - The
bag_recorder
process must be added to your LaunchDescription. - Rosbags are automatically uploaded to the artefacts dashboard unless you use the
--no upload
flag when usingartefacts run
.
Example
The following example shows a test launch file using the bag_recorder
helper to record a rosbag, add the bag_recorder
to the launch description, and later use the rosbag_filepath
for an assertion test (using another rosbag helper get_final_message
):
@pytest.mark.launch_test
def generate_test_description():
camera_topics = ["/depth_cam/rgb"]
extra_topics = ["/odom", "/noisy_estimate"]
bag_recorder, rosbag_filepath = rosbag.get_bag_recorder(
camera_topics + extra_topics, use_sim_time=False
)
test_odometry_node = ExecuteProcess(
cmd=[
"python3",
"src/test_odometry_node.py",
]
)
return LaunchDescription(
[
test_odometry_node,
launch_testing.actions.ReadyToTest(),
bag_recorder,
]
), {
"test_odometry_node": test_odometry_node,
"rosbag_filepath": rosbag_filepath,
}
@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
def test_end_position(self, rosbag_filepath):
final_distance_from_start = message_topics.get_final_message(
rosbag_filepath, "/distance_from_start.data"
)
assert final_distance_from_start < 0.1, (
f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
)
image_topics.extract_camera_image
Returns the last recorded image from a provided camera topic.
image_topics.extract_camera_image(
rosbag_file_path,
camera_topic,
output_dir="output"
)
Parameters
Parameter | Type | Description | Default |
---|---|---|---|
rosbag_file_path |
str |
Path to the recorded rosbag | Required |
camera_topic |
str |
Name of the ROS camera topic to take the image from | Required |
output_dir |
str |
Directory where to save the extracted image to | "output" |
Returns
None
: The image will be saved to the output_dir
specified.
Notes
- Images are saved in the
.png
format - By setting
output_dir
to the same directory asoutput_dirs
in your artefacts.yaml file, the image will be automatically uploaded to the artefacts dashboard after conclusion of the test.
Example
The following example shows how to extract the last camera image from the rosbag after the test has concluded. We use the rosbag_filepath
returned by the rosbag.get_rosbag_recorder
function.
from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
self, proc_info, test_odometry_node, rosbag_filepath
):
...
image_topics.extract_camera_image(rosbag_filepath, "/depth_cam/rgb")
image_topics.extract_video
Creates a WebM video by combining all images from a provided camera topic.
image_topics.extract_video(
bag_path,
topic_name,
output_path,
frame_rate=20
)
Parameters
Parameter | Type | Description | Default |
---|---|---|---|
bag_path |
str |
Path to the recorded rosbag | Required |
topic_name |
str |
Name of the ROS camera topic to create the video from | Required |
output_path |
str |
Path where the video will be saved (.webm) | Required |
frame_rate |
int |
Frame rate to use for the created video | 20 |
Returns
None
: The video file will be saved to the output_path
specified.
Notes
- Videos will be saved in WebM format. It will be automatically set if not present in
output_path
- By saving to a directory specified in
output_dirs
in your artefacts.yaml file, the video will be automatically uploaded to the artefacts dashboard.
Example
The following example shows how to extract a video from the rosbag after the test has concluded. We use the rosbag_filepath
returned by the rosbag.get_rosbag_recorder
function.
from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
self, proc_info, test_odometry_node, rosbag_filepath
):
...
image_topics.extract_video(rosbag_filepath, "/depth_cam/rgb", "output/depth_cam.webm")
message_topics.get_final_message
Retrieves the final message from a specified topic in a rosbag, with optional attribute access using dot notation.
message_topics.get_final_message(
rosbag_filepath,
topic
)
Parameters
Parameter | Type | Description | Default |
---|---|---|---|
rosbag_filepath |
str |
Path to the recorded rosbag | Required |
topic |
str |
Topic name. Use dot notation to drill down message attributes (e.g., "/distance.data") | Required |
Returns
Any
: Returns the value of the specified message attribute. The type depends on the accessed field.
Notes
- Supports attribute access using dot notation (e.g., "/distance.data")
Example
We will use the same example from the rosbag.get_bag_recorder
explanation. Note we get the data from the distance_from_start topic in order to make our assertion test.
@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
def test_end_position(self, rosbag_filepath):
final_distance_from_start = message_topics.get_final_message(
rosbag_filepath, "/distance_from_start.data"
)
assert final_distance_from_start < 0.1, (
f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
)