これは、このセクションの複数ページの印刷可能なビューです。 印刷するには、ここをクリックしてください.

このページの通常のビューに戻る.

Artefacts ツールキット (ベータ版)

Artefacts ツールキットの使用方法

Artefacts ツールキットは、開発者の生産性を向上させるために設計された Python パッケージです。一般的なテストタスクを簡素化し、Artefacts プラットフォームと統合するヘルパー関数のコレクションを提供します。

ツールキットは以下のモジュールで構成されています:

インストール

pip install artefacts-toolkit

Artefacts ツールキットを使用したサンプルプロジェクト:

  • Nav2 - Nav2 ナビゲーションスタックと Gazebo を使用したサンプルプロジェクト。

1 - Artefacts ツールキットの設定ヘルパー

Artefacts ツールキットの設定ヘルパーは、 artefacts.yaml Artefacts ツールキットの設定ヘルパーは、

インポート方法:

from artefacts_toolkit.config import get_artefacts_param

関数

関数リファレンス

get_artefacts_param

artefacts.yaml ファイルで設定されたパラメータを返します。 param_typelaunchに設定されている場合、ROS の起動引数として使用できるように string 型で返されます。

この関数はパラメトリックテストに特に役立ちます。 artefacts.yaml ファイルでパラメータ値のリストを定義すると(例:launch/world: ["empty.sdf", "bookstore.sdf", "restaurant.sdf"])、 artefacts run コマンドはリスト内の各値に対して1回ずつ、テストを複数回実行します。各テスト実行中、 get_artefacts_param("launch", "world") は自動的にその特定のテスト実行に対する現在のパラメータ値を返します。

これはグリッドベースのテストに特に強力です!それぞれ3つの可能な値を持つ3つのパラメータがある場合、Artefacts は自動的にテストを27回(3 × 3 × 3)実行しますが、起動ファイルのコードは変更されません。

get_artefacts_param(
    param_type,
    param_name,
    default=None,
    is_ros=True
)

パラメータ

パラメータ 説明 デフォルト
param_type str パラメータの名前空間/カテゴリ(例:「launch/world」の「launch」) 必須
param_name str T特定のパラメータ名(例:「launch/world」の「world」) 必須
default any artefacts.yamlでパラメータが見つからない場合に返す値 None
is_ros bool パラメータをROS パラメータ形式に変換するかどうか True

戻り値

この関数は次の動作でパラメータ値を返します:

  • param_type"launch"is_rosTrueの場合: artefacts.yaml ファイルの元の型に関係なく、値を strとして返します。これはROS起動引数として使用できるようにするためです。
  • defaultNone 以外に設定されている場合、artefacts が要求されたパラメータを見つけられない場合に値が返されます。これは(例えば) artefacts run の代わりに launch_test を使用しているがコード変更を行いたくない場合に便利です。また、 KeyError 例外を防ぎます。
  • その他のすべての場合:YAMLファイルの元の型(例: listdictintfloatstrなど)で値を返します。

以下の設定が設定された artefacts.yaml ファイルがある場合:

scenarios:
  defaults:
    output_dirs: ["output"]
    metrics:
        - /odometry_error
    params:
      launch/world: ["bookstore.sdf", "empty.sdf"]

テスト起動ファイルでパラメータを次のように取得します:

def generate_test_description():
    try:
        world = get_artefacts_param("launch", "world")
    except FileNotFoundError:
        world = "empty.world"

    run_headless = LaunchConfiguration("run_headless")
    launch_navigation_stack = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            [
                os.path.join(
                    get_package_share_directory("sam_bot_nav2_gz"),
                    "launch",
                    "complete_navigation.launch.py"
                ),
            ]
        ),
        launch_arguments=[("run_headless", run_headless), ("world_file", world)],
    )

...# Rest of launch test file   

This will run the same test twice - once in an empty world and once in a bookstore world - without any changes to your launch file code between runs.

2 - Artefacts ツールキットのチャートヘルパー

Artefacts ツールキットのチャートヘルパーは、テスト実行中にトピックから受信したデータを可視化するのに役立つように設計されています。 インポート方法:

from artefacts_toolkit.chart import make_chart

関数

関数リファレンス

make_chart

2つの提供されたトピックからのデータに基づいてインタラクティブなHTMLチャートを作成します。

make_chart(
    filepath,
    topic_x,
    topic_y,
    field_unit=None,
    output_dir="output",
    chart_name="chart",
    file_type="rosbag"
)

Parameters

パラメータ 説明 デフォルト
filepath str データファイル(rosbag)へのパス 必須
topic_x str x軸のトピック名。時間に対してプロットする場合は “Time” を使用 必須
topic_y str y軸のトピック名。時間に対してプロットする場合は “Time” を使用 必須
field_unit str フィールドデータの測定単位(例:「m/s」、「rad」) None
output_dir str チャートが保存されるディレクトリ "output"
chart_name str 生成されるチャートファイルの名前 "chart"
file_type str データファイルのタイプ。現在は “rosbag” をサポート "rosbag"

戻り値

None: output_dir<chart_name>.htmlという HTML チャートを作成しますが、値は返しません。

以下の例では、テストが完了し rosbag が保存された後、すなわちシャットダウン後に make_chart 関数を追加しています。

# my_test_file.launch.py
# test code

...

@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_exit_code(self, rosbag_filepath):
        make_chart(
            rosbag_filepath,
            "/odom.pose.pose.position.x",
            "/odom.pose.pose.position.y",
            field_unit="m",
            chart_name="odometry_position",
        )

以下のようなチャートが生成されます:

Example Chart

3 - Artefacts ツールキットの Gazebo ヘルパー

Artefacts ツールキットの Gazebo ヘルパーは、テスト実行中に Gazebo シミュレーションとやり取りするための便利な関数を提供します。これらのユーティリティを使用すると、シミュレーションオブジェクトの検査、モデル位置へのアクセス、および Gazebo と ROS2 間のトピックブリッジングが可能になります。

インポート方法:

from artefacts_toolkit.gazebo import bridge, gz

関数

関数リファレンス

bridge.get_camera_bridge

カメラトピック用の gazebo / ros2 トピックブリッジを作成します。

bridge.get_camera_bridge(
    topic_name,
    condition=None
)

パラメータ

パラメータ 説明 デフォルト
topic_name str Gazebo から ROS2 へブリッジするカメラトピック名 必須
condition str このブリッジを作成するタイミングを決定するオプションの起動条件 None

戻り値

Node:指定されたカメラトピックの parameter_bridge を実行する ROS 2 Node オブジェクトを返します。このノードは起動記述に含めることができます。

次の例では、起動引数に基づいて条件付きで作成されるカメラブリッジを示しています。record_video が “true” に設定されている場合、ブリッジが有効になり、ROS 2 ノードが Gazebo からのカメラ画像をサブスクライブできるようになります。返された Node は、起動プロセスに含めるために LaunchDescription に追加されます。

@pytest.mark.launch_test
def generate_test_description():
    from artefacts_toolkit.gazebo import bridge
    ...

    camera_topic = "/observation_camera/image"
    bag_recorder, rosbag_filepath = get_bag_recorder([camera_topic])
    sim = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(["bringup", "/robot.launch.py"])
    )

    record_video_launch_arg = DeclareLaunchArgument(
        "record_video", default_value="true"
    )
    record_video = LaunchConfiguration("record_video")
    camera_bridge = bridge.get_camera_bridge(camera_topic, condition=IfCondition(record_video))
    
    return LaunchDescription(
        [
            record_video_launch_arg,
            sim,
            camera_bridge,
            controller_process,
            launch_testing.actions.ReadyToTest(),
        ]
    )

gz.get_sim_objects

XML 構造を解析することで Gazebo ワールドファイルからモデル情報を抽出します。この関数は、ワールドファイルで定義されているすべてのモデルの名前と元のポーズを返します。

これは、実行中のシミュレーションに問い合わせる(物理、ランダム性、または相互作用によって位置が変わっている可能性がある)よりも、ワールドファイルで定義されているモデルの初期ポーズを知る必要がある場合に便利です。

gz.get_sim_objects(world_file)

パラメータ

パラメータ 説明 デフォルト
world_file str 解析する Gazebo ワールドファイルへのパス 必須

戻り値

tuple: この関数は、アンパックする必要がある2つの値を持つタプルを返します:

  • 辞書のリスト:
objects = [
    {
        "name": "model_1",
        "pose": "0 0 0 0 0 0"
    }, 
    {
        "name": "model_2",
        "pose": "1 2 3 0 0 0"
    },
    ...
]
  • モデル名からポーズへのマッピング辞書:
objects_positions = {
    "model_1": "0 0 0 0 0 0", 
    "model_2": "1 2 3 0 0 0",
    ...
}

# Working with the objects_position dict
_, objects_positions = gz.get_sim_objects("world.sdf")
model_pose = objects_positions["model_1"]  # "0 0 0 0 0 0"

# Working with the objects list
objects, _ = gz.get_sim_objects("world.sdf")
for model in objects:
    print(f"{model['name']} is at position {model['pose']}")

gz.get_model_location

実行中の Gazebo シミュレーションでのモデルの現在の (x, y, z) 位置を取得します。ワールドファイルから元の位置を読み取る get_sim_objects とは異なり、この関数はライブシミュレーションに問い合わせてモデルの現在の位置を取得します。

gz.get_model_location(model_name)

パラメータ

パラメータ 説明 デフォルト
model_name str シミュレーションで問い合わせるモデルの名前 必須

戻り値

tuple:メートル単位の (x, y, z) 位置を表す3つの浮動小数点値のタプルを返します:

(x_position, y_position, z_position)

以下の例は、ピック&プレースタスクを検証するために get_sim_objectsget_model_location を組み合わせる方法を示しています。次の方法を示しています:

  1. ワールドファイルから初期位置を取得する
  2. ロボットがタスクを完了するのを待つ
  3. 初期位置と現在の位置を比較して、どのオブジェクトが移動したかを検出する
class TestProcOutput(unittest.TestCase):
    def test_moved_meatballs(self, proc_output, controller_process):
        # Original locations
        sim_objects, sim_objects_positions = gz.get_sim_objects("worlds/env.sdf") # this includes models poses
        meatball_models = [obj["name"] for obj in sim_objects if "karaage" in obj["name"]]
        #  Wait for the control loop to finish
        proc_output.assertWaitFor("Done with tasks execution", timeout=300)
        picked_meatballs = 0 # karaage moved for more than 10cm
        for meatball in meatball_models:
            x, y, z = gz.get_model_location(meatball)
            x_original, y_original, z_original = sim_objects_positions[meatball]
            dist = ((x - x_original) ** 2 + (y - y_original) ** 2 + (z - z_original) ** 2) ** 0.5
            if dist > 0.1: #10cm
                # Likely to have been picked and moved somewhere else
                picked_meatballs += 1
        self.assertEqual(picked_meatballs, 4)

gz.kill_gazebo

現在実行中の gazebo プロセスを強制終了します。

gz.kill_gazebo()

戻り値

None: この関数は値を返しません。

from artefacts_toolkit.gazebo import gz
...

@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_exit_code(self, proc_info, controller_process, rosbag_filepath):
        gz.kill_gazebo()
        ...

4 - Artefacts ツールキットの Rosbag ヘルパー

Artefacts ツールキットの Rosbag ヘルパーは、テスト実行中に ROS バッグファイルを作成、記録、およびデータを抽出するための便利な関数を提供します。これらのユーティリティは、分析と可視化のために処理されるトピックデータをキャプチャするのに役立ちます。

インポート方法:

from artefacts_toolkit.rosbag import get_bag_recorder, image_topics, message_topics

関数

関数リファレンス

get_bag_recorder

指定されたトピック名のリストに対する rosbag2 レコーダーを作成し、ノードとファイルパスを返します。

rosbag.get_bag_recorder(
    topic_names,
    directory="rosbags",
    use_sim_time=False
)

パラメータ

パラメータ 説明 デフォルト
topic_names list[str] 記録する ROS トピックのリスト  必須
directory str rosbag が保存されるディレクトリ "rosbags"
use_sim_time bool システム時間の代わりにシミュレーション時間を使用するかどうか False

戻り値

tuple: 以下を含むタプルを返します:

  • bag_recorder(ExecuteProcess): レコーダープロセスを実行する Launch Action。
  • rosbag_filepath(str): 作成される rosbag ファイルへのパス。

以下の例では、 bag_recorder ヘルパーを使用して rosbag を記録し、 bag_recorder を起動記述に追加し、後で rosbag_filepath をアサーションテスト(別の rosbag ヘルパー get_final_messageを使用)に使用するテスト起動ファイルを示しています:

@pytest.mark.launch_test
def generate_test_description():
    camera_topics = ["/depth_cam/rgb"]
    extra_topics = ["/odom", "/noisy_estimate"]

    bag_recorder, rosbag_filepath = rosbag.get_bag_recorder(
        camera_topics + extra_topics, use_sim_time=False
    )

    test_odometry_node = ExecuteProcess(
        cmd=[
            "python3",
            "src/test_odometry_node.py",
        ]
    )
    return LaunchDescription(
        [
            test_odometry_node,
            launch_testing.actions.ReadyToTest(),
            bag_recorder,
        ]
    ), {
        "test_odometry_node": test_odometry_node,
        "rosbag_filepath": rosbag_filepath,
    }


@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_end_position(self, rosbag_filepath):
        final_distance_from_start = message_topics.get_final_message(
            rosbag_filepath, "/distance_from_start.data"
        )

        assert final_distance_from_start < 0.1, (
            f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
        )

image_topics.extract_camera_image

提供されたカメラトピックから最後に記録された画像を返します。

image_topics.extract_camera_image(
    rosbag_file_path,
    camera_topic,
    output_dir="output"
)

パラメータ

パラメータ 説明 デフォルト
rosbag_file_path str 記録された rosbag へのパス  必須
camera_topic str 画像を取得する ROS カメラトピックの名前 必須
output_dir str 抽出された画像を保存するディレクトリ "output"

戻り値

None: 画像で指定された output_dir に保存されます。

以下の例では、テスト終了後に rosbag から最後のカメラ画像を抽出する方法を示しています。 rosbag.get_rosbag_recorder 関数によって返された rosbag_filepath を使用します。

from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
    self, proc_info, test_odometry_node, rosbag_filepath
):
    ...

    image_topics.extract_camera_image(rosbag_filepath, "/depth_cam/rgb")

image_topics.extract_video

提供されたカメラトピックからすべての画像を組み合わせて WebM ビデオを作成します。

image_topics.extract_video(
    bag_path,
    topic_name,
    output_path,
    frame_rate=20
)

パラメータ

パラメータ 説明 デフォルト
bag_path str 記録された rosbag へのパス  必須
topic_name str ビデオを作成する ROS カメラトピックの名前 必須
output_path str ビデオが保存されるパス (.webm)        必須
frame_rate int 作成されるビデオに使用するフレームレート 20

戻り値

None: ビデオファイルは指定された output_path に保存されます。

注意

以下の例では、テスト終了後に rosbag からビデオを抽出する方法を示していますrosbag.get_rosbag_recorder 関数によって返された rosbag_filepath を使用します。

from artefacts_toolkit.rosbag import rosbag, image_topics
def test_exit_code(
    self, proc_info, test_odometry_node, rosbag_filepath
):
    ...

    image_topics.extract_video(rosbag_filepath, "/depth_cam/rgb", "output/depth_cam.webm")

message_topics.get_final_message

rosbag 内の指定されたトピックから最終メッセージを取得します。オプションでドット表記を使用した属性アクセスが可能です。

message_topics.get_final_message(
    rosbag_filepath,
    topic
)

パラメータ

パラメータ 説明 デフォルト
rosbag_filepath str 記録された rosbag へのパス 必須
topic str トピック名。ドット表記を使用してメッセージ属性を掘り下げます(例:"/distance.data")  必須

戻り値

Any: 指定されたメッセージ属性の値を返します。型はアクセスされるフィールドによって異なります。

rosbag.get_bag_recorder の説明と同じ例を使用します。アサーションテストを行うために distance_from_start トピックからデータを取得していることに注目してください。

@launch_testing.post_shutdown_test()
class TestProcOutputAfterShutdown(unittest.TestCase):
    def test_end_position(self, rosbag_filepath):
        final_distance_from_start = message_topics.get_final_message(
            rosbag_filepath, "/distance_from_start.data"
        )

        assert final_distance_from_start < 0.1, (
            f"Final distance from start is {final_distance_from_start}, expected less than 10cm"
        )