Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,8 @@ pyrightconfig.json
# camera
camera/*

# map
map/*

# tests
tests/
20 changes: 20 additions & 0 deletions map_specifications/YOUR_MAP_NAME.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Semantic Map Template
# This file defines the semantic information of the environment so that the robot can recognize important locations and navigate between them.

# - locations: A list of key points that the robot should be aware of
# · name: Name of the location
# · description: Brief description of the location
# · pose: Position and orientation within the map frame
# - x, y: 2D position in the map coordinate system
# - yaw: Orientation in radians (0 points along the positive X-axis)

name: YOUR_MAP_NAME
alias: YOUR_MAP_ALIAS # Optional alias for easier reference
type: 2D_OccupancyGrid # Map format (e.g., 2D_OccupancyGrid)
locations:
- name: Origin
description: Origin of your map
pose:
x: 0.0
y: 0.0
yaw: 0.0
199 changes: 197 additions & 2 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
from PIL import Image as PILImage

from utils.config_utils import get_verified_robot_spec_util, get_verified_robots_list_util
from utils.map_utils import (
draw_map_axes_util,
get_map_info_list_util,
get_map_info_util,
write_map_location_util,
)
from utils.network_utils import ping_ip_and_port
from utils.websocket_manager import WebSocketManager, parse_input

Expand Down Expand Up @@ -618,7 +624,7 @@ def subscribe_once(
expects_image_bool = convert_expects_image_hint(expects_image)

# Parse input with expects_image hint
msg_data, was_parsed_as_image = parse_input(response, expects_image_bool)
msg_data, was_parsed_as_image = parse_input(response, expects_image_bool, msg_type)

if not msg_data:
continue # parsing failed or empty
Expand Down Expand Up @@ -816,7 +822,7 @@ def subscribe_for_duration(
expects_image_bool = convert_expects_image_hint(expects_image)

# Parse input with expects_image hint
msg_data, was_parsed_as_image = parse_input(response, expects_image_bool)
msg_data, was_parsed_as_image = parse_input(response, expects_image_bool, msg_type)

if not msg_data:
continue # parsing failed or empty
Expand Down Expand Up @@ -3051,6 +3057,195 @@ def parse_arguments():
return parser.parse_args()


## ############################################################################################## ##
##
## MAP ANALYSIS
##
## ############################################################################################## ##
@mcp.tool(
description=(
"Draw coordinate axes on a previously saved OccupancyGrid map image. "
"The tool uses the map metadata (width, height, resolution, origin) and the "
"image path to overlay +X and +Y axes starting from the world origin position (0,0) "
"in image coordinates, then saves and returns the path to the annotated image."
)
)
def draw_map_axes(
map_message: Dict[str, Any],
line_thickness: int = 2,
) -> Dict[str, Any]:
"""
Overlay coordinate axes and a world-aligned grid on a map image
using OccupancyGrid metadata.

Args:
map_message (dict):
A dictionary that must contain:
- an 'info' field with:
- 'width' (int): map width in pixels.
- 'height' (int): map height in pixels.
- 'resolution' (float): map resolution in meters per pixel.
- 'origin.position.x' (float): origin X in map frame (meters).
- 'origin.position.y' (float): origin Y in map frame (meters).
The dictionary may optionally be wrapped inside a top-level 'msg' field.

line_thickness (int):
Thickness of the axis lines in image pixels. Larger values make the
rendered axes visually thicker and easier to see on high-resolution maps.

Returns:
dict:
On success:
{
"annotated_map_path": "<path to PNG with axes and grid>",
"width": <int>,
"height": <int>,
"resolution": <float>,
"axis_length_m": <float>,
"grid_spacing_m": <float>,
"message": "Axes and grid drawn successfully."
}

On failure:
{
"error": "<reason string>"
}
"""
return draw_map_axes_util(map_message, line_thickness)


@mcp.tool(
description=(
"Load specifications and usage context for a verified map information. "
"ONLY use if the map information is in the verified list (use get_verified_maps_list first to check). "
)
)
def get_verified_maps_info(name: str) -> dict:
"""
Load pre-defined specifications and additional context for a map information.

This is OPTIONAL - only for a small set of pre-verified map information stored in the repository.
Use get_verified_maps_list() first to check if a spec exists.

Args:
name (str): The exact map name from the verified list.

Returns:
dict: The map specification with type, prompts, and additional context.
"""
robot_config = get_map_info_util(name)

if len(robot_config) > 1:
return {
"error": f"Multiple configurations found for map '{name}'. Please specify a more precise name."
}
elif not robot_config:
return {
"error": f"No configuration found for map '{name}'. Please check the name and try again."
}
return {"robot_config": robot_config}


@mcp.tool(
description=(
"List pre-verified maps information that have specification files with usage guidance available. "
"Use this to check if a map information has additional context available before calling get_verified_maps_info. "
)
)
def get_verified_maps_list() -> dict:
"""
List all pre-verified map information that have specification files available in the repository.

This is a small curated list of map information with pre-defined specifications.

Returns:
dict: List of available verified map information names and count.
"""
return get_map_info_list_util()


@mcp.tool(
description=(
"Add or update a semantic location in a specific map YAML file under "
"map_specifications (e.g., hospital.yaml, office.yaml). "
"Use this after determining the correct map name via get_verified_maps_info / get_verified_maps_list."
)
)
def write_map_location(
map_name: str,
name: str,
description: str,
x: float,
y: float,
yaw: float,
) -> dict:
"""
Add a new location to semantic_map.yaml or update an existing one
if a location with the same name already exists.

Args:
name (str): The semantic location name (e.g., 'AED').
description (str): Description of the location.
x (float): X coordinate.
y (float): Y coordinate.
yaw (float): Orientation in radians.

Returns:
dict: Status and the added/updated location info.
"""
return write_map_location_util(map_name, name, description, x, y, yaw)


@mcp.tool(
description=(
"Analyze a previously received occupancy grid map after it has been processed into a visual overlay image."
"Map data may come from nav_msgs/OccupancyGrid topics, services, or from subscribe_once() / subscribe_for_duration() operations. "
"Always applies draw_map_axes() to generate an annotated overlay with axes and grid lines."
"During analysis, treat the drawn axes as the true world-frame origin (0,0)."
"Use the grid spacing provided by draw_map_axes() and count grid cells accurately to convert pixel positions into precise real-world coordinates."
"Use this tool after receiving a map to inspect the processed overlay of the latest environment."
"In an OccupancyGrid map, white represents free navigable space, black represents obstacles like walls, and gray represents unexplored areas that should not be considered in the analysis."
)
)
def analyze_previously_received_map():
"""
Analyze the previously processed occupancy grid overlay stored in ./map/received_map_overlay.png.

This tool loads the final overlay image generated from the most recently received
OccupancyGrid message. The map-processing pipeline converts the raw occupancy grid
into a PNG image and always applies draw_map_axes() to add coordinate axes and grid lines
for clear visual interpretation.

Overlay map images may come from:
- Any topic publishing OccupancyGrid data
- Any service returning map data
- subscribe_once() or subscribe_for_duration() operations
- draw_map_axes() operations (automatically applied before analysis)

In the overlay:
- The red axis represents the +X direction.
- The green axis represents the +Y direction.
- The point where the two axes meet is the true world-frame origin (0,0).

In an OccupancyGrid map:
- White = Free space (areas the robot can navigate)
- Black = Obstacles (walls, objects)
- Gray = Unexplored/unknown areas
- Do not consider the gray unexplored areas.

Use the grid spacing provided by draw_map_axes(), and count grid cells accurately
to convert pixel positions into precise real-world coordinates based on the axes.

This tool then returns the overlay as an MCP-compatible ImageContent format so
the LLM can visually interpret the environment without handling raw grid data.
"""
path = "./map/received_map_overlay.png"
if not os.path.exists(path):
return {"error": "No image found at ./map/received_map_overlay.png"}
img = PILImage.open(path)
return _encode_image_to_imagecontent(img)


def main():
"""Main entry point for the MCP server console script."""
# Parse command line arguments
Expand Down
Loading