Artefacts ツールキットの Rosbag ヘルパー
Artefacts ツールキットの Rosbag ヘルパーは、テスト実行中に ROS バッグファイルを作成、記録、およびデータを抽出するための便利な関数を提供します。これらのユーティリティは、分析と可視化のために処理されるトピックデータをキャプチャするのに役立ちます。
インポート方法:
from artefacts_toolkit.rosbag import get_bag_recorder, image_topics, message_topics
関数
get_bag_recorder
image_topics.extract_camera_image
image_topics.extract_video
message_topics.get_final_message
関数リファレンス
get_bag_recorder
指定されたトピック名のリストに対する rosbag2 レコーダーを作成し、ノードとファイルパスを返します。
rosbag.get_bag_recorder(
topic_names,
directory="rosbags",
use_sim_time=False
)
パラメータ
パラメータ | 型 | 説明 | デフォルト |
---|---|---|---|
topic_names |
list[str] |
記録する ROS トピックのリスト | 必須 |
directory |
str |
rosbag が保存されるディレクトリ | "rosbags" |
use_sim_time |
bool |
システム時間の代わりにシミュレーション時間を使用するかどうか | False |
戻り値
tuple
: 以下を含むタプルを返します:
bag_recorder
(ExecuteProcess
): レコーダープロセスを実行する Launch Action。rosbag_filepath
(str
): 作成される rosbag ファイルへのパス。
Note
- ファイル形式は MCAP です。
- Rosbag ファイルは自動的に
"rosbag2_" + yyyymmddhhmmss
形式のタイムスタンプで名前が付けられます。 - The
bag_recorder
プロセスは LaunchDescription に追加する必要があります。 artefacts run
を使用する際に--no upload
フラグを使用しない限り、rosbag は自動的に artefacts ダッシュボードにアップロードされます。
例
以下の例では、 bag_recorder
ヘルパーを使用して rosbag を記録し、 bag_recorder
を起動記述に追加し、後で rosbag_filepath
をアサーションテスト(別の rosbag ヘルパー get_final_message
を使用)に使用するテスト起動ファイルを示しています:
@pytest.mark.launch_test
def generate_test_description():
camera_topics = ["/depth_cam/rgb"]
extra_topics = ["/odom", "/noisy_estimate"]
bag_recorder, rosbag_filepath = rosbag.get_bag_recorder(
camera_topics + extra_topics, use_sim_time=False
)
test_odometry_node = ExecuteProcess(
cmd=[
"python3",
"src/test_odometry_node.py",
]
)
return LaunchDescription(
[
test_odometry_node,
launch_testing.actions.ReadyToTest(),
bag_recorder,
]
), {
"test_odometry_node": test_odometry_node,
"rosbag_filepath": rosbag_filepath,
}
@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
def test_end_position(self, rosbag_filepath):
final_distance_from_start = message_topics.get_final_message(
rosbag_filepath, "/distance_from_start.data"
)
assert final_distance_from_start < 0.1, (
f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
)
image_topics.extract_camera_image
提供されたカメラトピックから最後に記録された画像を返します。
image_topics.extract_camera_image(
rosbag_file_path,
camera_topic,
output_dir="output"
)
パラメータ
パラメータ | 型 | 説明 | デフォルト |
---|---|---|---|
rosbag_file_path |
str |
記録された rosbag へのパス | 必須 |
camera_topic |
str |
画像を取得する ROS カメラトピックの名前 | 必須 |
output_dir |
str |
抽出された画像を保存するディレクトリ | "output" |
戻り値
None
: 画像で指定された output_dir
に保存されます。
Note
- 画像は
.png
形式で保存されます。 output_dir
を artefacts.yaml ファイルのoutput_dirs
と同じディレクトリに設定することで、テスト終了後に画像が自動的に artefacts ダッシュボードにアップロードされます。
例
以下の例では、テスト終了後に rosbag から最後のカメラ画像を抽出する方法を示しています。 rosbag.get_rosbag_recorder
関数によって返された rosbag_filepath
を使用します。
from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
self, proc_info, test_odometry_node, rosbag_filepath
):
...
image_topics.extract_camera_image(rosbag_filepath, "/depth_cam/rgb")
image_topics.extract_video
提供されたカメラトピックからすべての画像を組み合わせて WebM ビデオを作成します。
image_topics.extract_video(
bag_path,
topic_name,
output_path,
frame_rate=20
)
パラメータ
パラメータ | 型 | 説明 | デフォルト |
---|---|---|---|
bag_path |
str |
記録された rosbag へのパス | 必須 |
topic_name |
str |
ビデオを作成する ROS カメラトピックの名前 | 必須 |
output_path |
str |
ビデオが保存されるパス (.webm) | 必須 |
frame_rate |
int |
作成されるビデオに使用するフレームレート | 20 |
戻り値
None
: ビデオファイルは指定された output_path
に保存されます。
注意
Note
- ビデオは WebM 形式で保存されます。
output_path
に存在しない場合は自動的に設定されます。 - artefacts.yaml ファイルの
output_dirs
で指定されたディレクトリに保存することで、ビデオは自動的に artefacts ダッシュボードにアップロードされます。
例
以下の例では、テスト終了後に rosbag からビデオを抽出する方法を示していますrosbag.get_rosbag_recorder
関数によって返された rosbag_filepath
を使用します。
from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
self, proc_info, test_odometry_node, rosbag_filepath
):
...
image_topics.extract_video(rosbag_filepath, "/depth_cam/rgb", "output/depth_cam.webm")
message_topics.get_final_message
rosbag 内の指定されたトピックから最終メッセージを取得します。オプションでドット表記を使用した属性アクセスが可能です。
message_topics.get_final_message(
rosbag_filepath,
topic
)
パラメータ
パラメータ | 型 | 説明 | デフォルト |
---|---|---|---|
rosbag_filepath |
str |
記録された rosbag へのパス | 必須 |
topic |
str |
トピック名。ドット表記を使用してメッセージ属性を掘り下げます(例:"/distance.data") | 必須 |
戻り値
Any
: 指定されたメッセージ属性の値を返します。型はアクセスされるフィールドによって異なります。
Note
- ドット表記を使用した属性アクセスをサポートしています(例:"/distance.data")
例
rosbag.get_bag_recorder
の説明と同じ例を使用します。アサーションテストを行うために distance_from_start トピックからデータを取得していることに注目してください。
@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
def test_end_position(self, rosbag_filepath):
final_distance_from_start = message_topics.get_final_message(
rosbag_filepath, "/distance_from_start.data"
)
assert final_distance_from_start < 0.1, (
f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
)