diff --git a/.gitignore b/.gitignore index 3220b7a..a2d5c2d 100644 --- a/.gitignore +++ b/.gitignore @@ -180,5 +180,8 @@ pyrightconfig.json # camera camera/* +# map +map/* + # tests tests/ \ No newline at end of file diff --git a/map_specifications/YOUR_MAP_NAME.yaml b/map_specifications/YOUR_MAP_NAME.yaml new file mode 100644 index 0000000..9e94fe1 --- /dev/null +++ b/map_specifications/YOUR_MAP_NAME.yaml @@ -0,0 +1,20 @@ +# Semantic Map Template +# This file defines the semantic information of the environment so that the robot can recognize important locations and navigate between them. + +# - locations: A list of key points that the robot should be aware of +# · name: Name of the location +# · description: Brief description of the location +# · pose: Position and orientation within the map frame +# - x, y: 2D position in the map coordinate system +# - yaw: Orientation in radians (0 points along the positive X-axis) + +name: YOUR_MAP_NAME +alias: YOUR_MAP_ALIAS # Optional alias for easier reference +type: 2D_OccupancyGrid # Map format (e.g., 2D_OccupancyGrid) +locations: + - name: Origin + description: Origin of your map + pose: + x: 0.0 + y: 0.0 + yaw: 0.0 diff --git a/server.py b/server.py index 4106566..f98fb2f 100644 --- a/server.py +++ b/server.py @@ -13,6 +13,12 @@ from PIL import Image as PILImage from utils.config_utils import get_verified_robot_spec_util, get_verified_robots_list_util +from utils.map_utils import ( + draw_map_axes_util, + get_map_info_list_util, + get_map_info_util, + write_map_location_util, +) from utils.network_utils import ping_ip_and_port from utils.websocket_manager import WebSocketManager, parse_input @@ -618,7 +624,7 @@ def subscribe_once( expects_image_bool = convert_expects_image_hint(expects_image) # Parse input with expects_image hint - msg_data, was_parsed_as_image = parse_input(response, expects_image_bool) + msg_data, was_parsed_as_image = parse_input(response, expects_image_bool, msg_type) if not msg_data: continue # parsing failed or empty @@ -816,7 +822,7 @@ def subscribe_for_duration( expects_image_bool = convert_expects_image_hint(expects_image) # Parse input with expects_image hint - msg_data, was_parsed_as_image = parse_input(response, expects_image_bool) + msg_data, was_parsed_as_image = parse_input(response, expects_image_bool, msg_type) if not msg_data: continue # parsing failed or empty @@ -3051,6 +3057,195 @@ def parse_arguments(): return parser.parse_args() +## ############################################################################################## ## +## +## MAP ANALYSIS +## +## ############################################################################################## ## +@mcp.tool( + description=( + "Draw coordinate axes on a previously saved OccupancyGrid map image. " + "The tool uses the map metadata (width, height, resolution, origin) and the " + "image path to overlay +X and +Y axes starting from the world origin position (0,0) " + "in image coordinates, then saves and returns the path to the annotated image." + ) +) +def draw_map_axes( + map_message: Dict[str, Any], + line_thickness: int = 2, +) -> Dict[str, Any]: + """ + Overlay coordinate axes and a world-aligned grid on a map image + using OccupancyGrid metadata. + + Args: + map_message (dict): + A dictionary that must contain: + - an 'info' field with: + - 'width' (int): map width in pixels. + - 'height' (int): map height in pixels. + - 'resolution' (float): map resolution in meters per pixel. + - 'origin.position.x' (float): origin X in map frame (meters). + - 'origin.position.y' (float): origin Y in map frame (meters). + The dictionary may optionally be wrapped inside a top-level 'msg' field. + + line_thickness (int): + Thickness of the axis lines in image pixels. Larger values make the + rendered axes visually thicker and easier to see on high-resolution maps. + + Returns: + dict: + On success: + { + "annotated_map_path": "", + "width": , + "height": , + "resolution": , + "axis_length_m": , + "grid_spacing_m": , + "message": "Axes and grid drawn successfully." + } + + On failure: + { + "error": "" + } + """ + return draw_map_axes_util(map_message, line_thickness) + + +@mcp.tool( + description=( + "Load specifications and usage context for a verified map information. " + "ONLY use if the map information is in the verified list (use get_verified_maps_list first to check). " + ) +) +def get_verified_maps_info(name: str) -> dict: + """ + Load pre-defined specifications and additional context for a map information. + + This is OPTIONAL - only for a small set of pre-verified map information stored in the repository. + Use get_verified_maps_list() first to check if a spec exists. + + Args: + name (str): The exact map name from the verified list. + + Returns: + dict: The map specification with type, prompts, and additional context. + """ + robot_config = get_map_info_util(name) + + if len(robot_config) > 1: + return { + "error": f"Multiple configurations found for map '{name}'. Please specify a more precise name." + } + elif not robot_config: + return { + "error": f"No configuration found for map '{name}'. Please check the name and try again." + } + return {"robot_config": robot_config} + + +@mcp.tool( + description=( + "List pre-verified maps information that have specification files with usage guidance available. " + "Use this to check if a map information has additional context available before calling get_verified_maps_info. " + ) +) +def get_verified_maps_list() -> dict: + """ + List all pre-verified map information that have specification files available in the repository. + + This is a small curated list of map information with pre-defined specifications. + + Returns: + dict: List of available verified map information names and count. + """ + return get_map_info_list_util() + + +@mcp.tool( + description=( + "Add or update a semantic location in a specific map YAML file under " + "map_specifications (e.g., hospital.yaml, office.yaml). " + "Use this after determining the correct map name via get_verified_maps_info / get_verified_maps_list." + ) +) +def write_map_location( + map_name: str, + name: str, + description: str, + x: float, + y: float, + yaw: float, +) -> dict: + """ + Add a new location to semantic_map.yaml or update an existing one + if a location with the same name already exists. + + Args: + name (str): The semantic location name (e.g., 'AED'). + description (str): Description of the location. + x (float): X coordinate. + y (float): Y coordinate. + yaw (float): Orientation in radians. + + Returns: + dict: Status and the added/updated location info. + """ + return write_map_location_util(map_name, name, description, x, y, yaw) + + +@mcp.tool( + description=( + "Analyze a previously received occupancy grid map after it has been processed into a visual overlay image." + "Map data may come from nav_msgs/OccupancyGrid topics, services, or from subscribe_once() / subscribe_for_duration() operations. " + "Always applies draw_map_axes() to generate an annotated overlay with axes and grid lines." + "During analysis, treat the drawn axes as the true world-frame origin (0,0)." + "Use the grid spacing provided by draw_map_axes() and count grid cells accurately to convert pixel positions into precise real-world coordinates." + "Use this tool after receiving a map to inspect the processed overlay of the latest environment." + "In an OccupancyGrid map, white represents free navigable space, black represents obstacles like walls, and gray represents unexplored areas that should not be considered in the analysis." + ) +) +def analyze_previously_received_map(): + """ + Analyze the previously processed occupancy grid overlay stored in ./map/received_map_overlay.png. + + This tool loads the final overlay image generated from the most recently received + OccupancyGrid message. The map-processing pipeline converts the raw occupancy grid + into a PNG image and always applies draw_map_axes() to add coordinate axes and grid lines + for clear visual interpretation. + + Overlay map images may come from: + - Any topic publishing OccupancyGrid data + - Any service returning map data + - subscribe_once() or subscribe_for_duration() operations + - draw_map_axes() operations (automatically applied before analysis) + + In the overlay: + - The red axis represents the +X direction. + - The green axis represents the +Y direction. + - The point where the two axes meet is the true world-frame origin (0,0). + + In an OccupancyGrid map: + - White = Free space (areas the robot can navigate) + - Black = Obstacles (walls, objects) + - Gray = Unexplored/unknown areas + - Do not consider the gray unexplored areas. + + Use the grid spacing provided by draw_map_axes(), and count grid cells accurately + to convert pixel positions into precise real-world coordinates based on the axes. + + This tool then returns the overlay as an MCP-compatible ImageContent format so + the LLM can visually interpret the environment without handling raw grid data. + """ + path = "./map/received_map_overlay.png" + if not os.path.exists(path): + return {"error": "No image found at ./map/received_map_overlay.png"} + img = PILImage.open(path) + return _encode_image_to_imagecontent(img) + + def main(): """Main entry point for the MCP server console script.""" # Parse command line arguments diff --git a/utils/config_utils.py b/utils/config_utils.py index 0f83b86..d4eda2b 100644 --- a/utils/config_utils.py +++ b/utils/config_utils.py @@ -86,3 +86,178 @@ def get_verified_robots_list_util() -> dict: except Exception as e: return {"error": f"Failed to read robot specifications directory: {str(e)}"} + + +def load_map_config(map_name: str, specs_dir: str) -> dict: + """ + Load the map configuration from a YAML file by map name. + + Args: + map_name (str): The name of the map. + specs_dir (str): Directory containing map information files. + + Returns: + dict: The map configuration. + + Raises: + FileNotFoundError: If the YAML file does not exist. + """ + file_path = Path(specs_dir) / f"{map_name}.yaml" + + if not file_path.exists(): + raise FileNotFoundError(f"map config file not found: {file_path}") + + with file_path.open("r") as file: + return yaml.safe_load(file) or {} + + +def get_map_info_util(name: str) -> dict: + """ + Get the map information in a more accessible format. + + Args: + name (str): The name of the map. + + Returns: + dict: Get map information with map name as key. + """ + # Resolve relative to the project root (one level up from utils) + specs_dir = Path(__file__).parent.parent / "map_specifications" + + name = name.replace(" ", "_") + config = load_map_config(name, str(specs_dir)) + parsed_config = {} + + # Check if the loaded config has the required fields + if not config: + raise ValueError(f"No configuration found for map '{name}'") + + # Check required fields + for field in ("type", "locations"): + if field not in config or config[field] in (None, ""): + raise ValueError(f"Map '{name}' is missing required field: {field}") + + # Create configuration with robot name as key + parsed_config[name] = {"type": config["type"], "locations": config["locations"]} + + return parsed_config + + +def get_map_info_list_util() -> dict: + """ + Get a list of all available map specification files. + + Returns: + dict: List of available map names that can be used with get_map_info_util. + """ + # Resolve relative to the project root (one level up from utils) + specs_path = Path(__file__).parent.parent / "map_specifications" + + if not specs_path.exists(): + return {"error": f"Map specifications directory not found: {specs_path}"} + + try: + # Find all YAML files in the specifications directory + yaml_files = list(specs_path.glob("*.yaml")) + + if not yaml_files: + return {"error": "No map specification files found"} + + # Extract robot names (file names without .yaml extension) + map_names = [file.stem for file in yaml_files] + map_names.sort() # Sort alphabetically for consistency + + return {"map_specifications": map_names, "count": len(map_names)} + + except Exception as e: + return {"error": f"Failed to read map specifications directory: {str(e)}"} + + +def write_map_location_util( + map_name: str, # e.g. "hospital", "office" + name: str, # e.g. "AED" + description: str, + x: float, + y: float, + yaw: float, +) -> dict: + """ + Add or update a location entry in map_specifications/{map_name}.yaml. + + Args: + map_name (str): Map name without extension (e.g., 'hospital', 'office'). + name (str): Location name (e.g., 'AED'). + description (str): Description of the location. + x (float): X coordinate. + y (float): Y coordinate. + yaw (float): Orientation in radians. + + Returns: + dict: Status, which file was modified, and the added/updated location. + """ + try: + # Resolve specs directory (same as get_map_info_util) + specs_dir = Path(__file__).parent.parent / "map_specifications" + + if not specs_dir.exists(): + return {"error": f"Map specifications directory not found: {specs_dir}"} + + # Build the YAML file path for the selected map + safe_map_name = map_name.replace(" ", "_") + map_file = specs_dir / f"{safe_map_name}.yaml" + + if not map_file.exists(): + return {"error": f"Map file not found: {map_file}"} + + # Load existing YAML + with map_file.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + + # Ensure 'locations' exists + if "locations" not in data or data["locations"] is None: + data["locations"] = [] + + locations = data["locations"] + + # Build the new/updated location entry + new_location = { + "name": name, + "description": description, + "pose": { + "x": float(x), + "y": float(y), + "yaw": float(yaw), + }, + } + + # If a location with the same name exists → update it, else append + action = "added" + for idx, loc in enumerate(locations): + if loc.get("name") == name: + locations[idx] = new_location + action = "updated" + break + else: + locations.append(new_location) + + data["locations"] = locations + + # Write back to the same YAML file + with map_file.open("w", encoding="utf-8") as f: + yaml.safe_dump( + data, + f, + sort_keys=False, + allow_unicode=True, + ) + + return { + "status": "success", + "action": action, + "map_name": safe_map_name, + "file": str(map_file), + "location": new_location, + } + + except Exception as e: + return {"error": f"Failed to write semantic location: {str(e)}"} diff --git a/utils/map_utils.py b/utils/map_utils.py new file mode 100644 index 0000000..7c6fd82 --- /dev/null +++ b/utils/map_utils.py @@ -0,0 +1,406 @@ +import math +import os +import sys +from pathlib import Path +from typing import Any, Dict + +import cv2 +import yaml + + +def load_map_config(map_name: str, specs_dir: str) -> dict: + """ + Load the map configuration from a YAML file by map name. + + Args: + map_name (str): The name of the map. + specs_dir (str): Directory containing map information files. + + Returns: + dict: The map configuration. + + Raises: + FileNotFoundError: If the YAML file does not exist. + """ + file_path = Path(specs_dir) / f"{map_name}.yaml" + + if not file_path.exists(): + raise FileNotFoundError(f"map config file not found: {file_path}") + + with file_path.open("r") as file: + return yaml.safe_load(file) or {} + + +def get_map_info_util(name: str) -> dict: + """ + Get the map information in a more accessible format. + + Args: + name (str): The name of the map. + + Returns: + dict: Get map information with map name as key. + """ + # Resolve relative to the project root (one level up from utils) + specs_dir = Path(__file__).parent.parent / "map_specifications" + + name = name.replace(" ", "_") + config = load_map_config(name, str(specs_dir)) + parsed_config = {} + + # Check if the loaded config has the required fields + if not config: + raise ValueError(f"No configuration found for map '{name}'") + + # Check required fields + for field in ("type", "locations"): + if field not in config or config[field] in (None, ""): + raise ValueError(f"Map '{name}' is missing required field: {field}") + + # Create configuration with robot name as key + parsed_config[name] = {"type": config["type"], "locations": config["locations"]} + + return parsed_config + + +def get_map_info_list_util() -> dict: + """ + Get a list of all available map specification files. + + Returns: + dict: List of available map names that can be used with get_map_info_util. + """ + # Resolve relative to the project root (one level up from utils) + specs_path = Path(__file__).parent.parent / "map_specifications" + + if not specs_path.exists(): + return {"error": f"Map specifications directory not found: {specs_path}"} + + try: + # Find all YAML files in the specifications directory + yaml_files = list(specs_path.glob("*.yaml")) + + if not yaml_files: + return {"error": "No map specification files found"} + + # Extract robot names (file names without .yaml extension) + map_names = [file.stem for file in yaml_files] + map_names.sort() # Sort alphabetically for consistency + + return {"map_specifications": map_names, "count": len(map_names)} + + except Exception as e: + return {"error": f"Failed to read map specifications directory: {str(e)}"} + + +def write_map_location_util( + map_name: str, # e.g. "hospital", "office" + name: str, # e.g. "AED" + description: str, + x: float, + y: float, + yaw: float, +) -> dict: + """ + Add or update a location entry in map_specifications/{map_name}.yaml. + + Args: + map_name (str): Map name without extension (e.g., 'hospital', 'office'). + name (str): Location name (e.g., 'AED'). + description (str): Description of the location. + x (float): X coordinate. + y (float): Y coordinate. + yaw (float): Orientation in radians. + + Returns: + dict: Status, which file was modified, and the added/updated location. + """ + try: + # 1) Resolve specs directory (same as get_map_info_util) + specs_dir = Path(__file__).parent.parent / "map_specifications" + + if not specs_dir.exists(): + return {"error": f"Map specifications directory not found: {specs_dir}"} + + # 2) Build the YAML file path for the selected map + # Example: hospital -> map_specifications/hospital.yaml + safe_map_name = map_name.replace(" ", "_") + map_file = specs_dir / f"{safe_map_name}.yaml" + + if not map_file.exists(): + return {"error": f"Map file not found: {map_file}"} + + # 3) Load existing YAML + with map_file.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + + # 4) Ensure 'locations' exists + if "locations" not in data or data["locations"] is None: + data["locations"] = [] + + locations = data["locations"] + + # 5) Build the new/updated location entry + new_location = { + "name": name, + "description": description, + "pose": { + "x": float(x), + "y": float(y), + "yaw": float(yaw), + }, + } + + # 6) If a location with the same name exists → update it, else append + action = "added" + for idx, loc in enumerate(locations): + if loc.get("name") == name: + locations[idx] = new_location + action = "updated" + break + else: + locations.append(new_location) + + data["locations"] = locations + + # 7) Write back to the same YAML file + with map_file.open("w", encoding="utf-8") as f: + yaml.safe_dump( + data, + f, + sort_keys=False, + allow_unicode=True, + ) + + return { + "status": "success", + "action": action, + "map_name": safe_map_name, + "file": str(map_file), + "location": new_location, + } + + except Exception as e: + return {"error": f"Failed to write semantic location: {str(e)}"} + + +def draw_map_axes_util( + map_message: Dict[str, Any], + line_thickness: int = 2, +) -> Dict[str, Any]: + """ + Overlay coordinate axes and a world-aligned grid on a map image + using OccupancyGrid metadata. + + Args: + map_message (dict): + A dictionary that must contain: + - an 'info' field with: + - 'width' (int): map width in pixels. + - 'height' (int): map height in pixels. + - 'resolution' (float): map resolution in meters per pixel. + - 'origin.position.x' (float): origin X in map frame (meters). + - 'origin.position.y' (float): origin Y in map frame (meters). + The dictionary may optionally be wrapped inside a top-level 'msg' field. + + line_thickness (int): + Thickness of the axis lines in image pixels. Larger values make the + rendered axes visually thicker and easier to see on high-resolution maps. + + Returns: + dict: + On success: + { + "annotated_map_path": "", + "width": , + "height": , + "resolution": , + "axis_length_m": , + "grid_spacing_m": , + "world_bounds": { + "x": [, ], + "y": [, ] + } + "message": "Axes and grid drawn successfully." + } + + On failure: + { + "error": "" + } + """ + try: + # Allow both {"msg": {...}} or direct message dict + msg = map_message.get("msg", map_message) + if not isinstance(msg, dict): + return {"error": "Invalid map_message: expected a dict or a dict with a 'msg' field."} + + info = msg.get("info", {}) + + # Use fixed path as you designed earlier + map_image_path = os.path.join("./map", "received_map.png") + + width = info.get("width") + height = info.get("height") + resolution = info.get("resolution") + origin_pos = info.get("origin", {}).get("position", {}) + + if width is None or height is None or resolution is None: + return {"error": "Missing width, height, or resolution in map metadata."} + + # Load the base map image + img = cv2.imread(map_image_path, cv2.IMREAD_COLOR) + if img is None: + return {"error": f"Failed to load map image from path: {map_image_path}"} + + h, w = img.shape[:2] + + # If the actual image size differs from the metadata, prefer the image size + if w != int(width) or h != int(height): + print( + f"[draw_map_axes] Warning: image size ({w}x{h}) does not match " + f"metadata ({width}x{height}). Using image size for drawing.", + file=sys.stderr, + ) + width = w + height = h + + # Compute the physical map size in meters + map_width_m = float(width) * float(resolution) + map_height_m = float(height) * float(resolution) + + # Automatically determine axis length as 20% of the shorter map dimension + shortest_side_m = min(map_width_m, map_height_m) + axis_length_m = max(shortest_side_m * 0.2, resolution) # at least one pixel + axis_length_px = max(int(axis_length_m / float(resolution)), 1) + + # Compute image pixel of world (0,0) + grid_origin_x = float(origin_pos.get("x", 0.0)) + grid_origin_y = float(origin_pos.get("y", 0.0)) + + # World(0,0) → image pixel(u,v) + # u increases to the right, v increases downward in image space. + origin_u = int(round((0.0 - grid_origin_x) / float(resolution))) + origin_v = int(round(height - (0.0 - grid_origin_y) / float(resolution))) + + # Clamp within image boundaries + origin_u = max(0, min(width - 1, origin_u)) + origin_v = max(0, min(height - 1, origin_v)) + + # Choose a "nice" grid spacing in meters so that the number of lines is reasonable. + # Target ~10-20 grid cells across the shortest side. + target_cells = 15.0 + base_spacing = max(shortest_side_m / target_cells, resolution) + + # Snap to a human-friendly spacing (0.1, 0.2, 0.5, 1, 2, 5, 10, 20, ...) + nice_steps = [0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0] + grid_spacing_m = nice_steps[-1] + for step in nice_steps: + if step >= base_spacing: + grid_spacing_m = step + break + + # World extents covered by the map + x_min = grid_origin_x + x_max = grid_origin_x + map_width_m + y_min = grid_origin_y + y_max = grid_origin_y + map_height_m + + # Vertical grid lines (constant x = k * grid_spacing_m) + # Find k range such that lines fall inside [x_min, x_max] + if grid_spacing_m > 0: + k_start_x = math.ceil(x_min / grid_spacing_m) + k_end_x = math.floor(x_max / grid_spacing_m) + + for k in range(k_start_x, k_end_x + 1): + x_world = k * grid_spacing_m + u = (x_world - grid_origin_x) / float(resolution) + u_int = int(round(u)) + if 0 <= u_int < width: + cv2.line( + img, + (u_int, 0), + (u_int, height - 1), + color=(200, 200, 200), # light gray + thickness=1, + ) + + # Horizontal grid lines (constant y = k * grid_spacing_m) + k_start_y = math.ceil(y_min / grid_spacing_m) + k_end_y = math.floor(y_max / grid_spacing_m) + + for k in range(k_start_y, k_end_y + 1): + y_world = k * grid_spacing_m + v = height - (y_world - grid_origin_y) / float(resolution) + v_int = int(round(v)) + if 0 <= v_int < height: + cv2.line( + img, + (0, v_int), + (width - 1, v_int), + color=(200, 200, 200), # light gray + thickness=1, + ) + + # +X axis endpoint + x_end_u = min(width - 1, origin_u + axis_length_px) + x_end_v = origin_v + + # +Y axis endpoint + y_end_u = origin_u + y_end_v = max(0, origin_v - axis_length_px) + + # Draw +X axis (red) + cv2.arrowedLine( + img, + (origin_u, origin_v), + (x_end_u, x_end_v), + color=(0, 0, 255), + thickness=line_thickness, + tipLength=0.05, + ) + + # Draw +Y axis (green) + cv2.arrowedLine( + img, + (origin_u, origin_v), + (y_end_u, y_end_v), + color=(0, 255, 0), + thickness=line_thickness, + tipLength=0.05, + ) + + # Mark world(0,0) with a small blue circle + cv2.circle( + img, + (origin_u, origin_v), + radius=4, + color=(255, 0, 0), + thickness=-1, + ) + + # Save annotated map + base, ext = os.path.splitext(map_image_path) + annotated_path = base + "_overlay" + (ext or ".png") + os.makedirs(os.path.dirname(annotated_path) or ".", exist_ok=True) + + saved = cv2.imwrite(annotated_path, img) + if not saved: + return {"error": f"Failed to save annotated image to: {annotated_path}"} + + return { + "annotated_map_path": annotated_path, + "width": int(width), + "height": int(height), + "resolution": float(resolution), + "axis_length_m": float(axis_length_m), + "grid_spacing_m": float(grid_spacing_m), + "world_bounds": { + "x": [float(x_min), float(x_max)], + "y": [float(y_min), float(y_max)], + }, + "message": "Axes and grid drawn successfully.", + } + + except Exception as e: + return {"error": f"Exception while drawing axes and grid: {e}"} diff --git a/utils/websocket_manager.py b/utils/websocket_manager.py index e360a1a..5dea839 100644 --- a/utils/websocket_manager.py +++ b/utils/websocket_manager.py @@ -135,6 +135,80 @@ def parse_image(raw: Union[str, bytes] | None) -> dict | None: return _handle_raw_image(data_b64, height, width, encoding, msg, result) +def parse_map(parsed_data: dict) -> dict | None: + """ + Convert a nav_msgs/OccupancyGrid message (already parsed as JSON) + into a PNG image and update the message to reference the image path. + + Args: + parsed_data (dict): Parsed JSON from rosbridge. + + Returns: + dict | None: Updated parsed_data with: + - 'msg.data' removed + - 'msg.map_image_path' set + or None if conversion failed. + """ + # Extract the inner message + msg = parsed_data.get("msg", {}) + info = msg.get("info", {}) + width = info.get("width") + height = info.get("height") + data = msg.get("data") + + # Validate OccupancyGrid fields + if width is None or height is None or data is None: + print("[Map] Missing width, height, or data in OccupancyGrid.", file=sys.stderr) + return None + + if not isinstance(data, list): + print("[Map] OccupancyGrid data is not a list.", file=sys.stderr) + return None + + # Convert list to numpy array and reshape + grid = np.array(data, dtype=np.int16) + expected_size = int(width) * int(height) + if grid.size != expected_size: + print(f"[Map] Size mismatch: got {grid.size}, expected {expected_size}.", file=sys.stderr) + return None + + grid = grid.reshape((height, width)) + + # Map occupancy values to grayscale: + # -1 (unknown) → 127 + # 0 (free) → 255 + # 100 (occupied) → 0 + img = np.zeros_like(grid, dtype=np.uint8) + img[grid == -1] = 127 + img[grid == 0] = 255 + img[grid == 100] = 0 + + img = cv2.flip(img, 0) + + # Ensure output directory exists + os.makedirs("./map", exist_ok=True) + + # Save the image as PNG + map_path = os.path.join("./map", "received_map.png") + success = cv2.imwrite(map_path, img) + if not success: + print(f"[Map] Failed to save OccupancyGrid image at {map_path}", file=sys.stderr) + return None + + print(f"[Map] Saved OccupancyGrid image at {map_path}", file=sys.stderr) + + # Update the original parsed_data: + # - remove large data array + # - add image path + msg_copy = msg.copy() + msg_copy.pop("data", None) + msg_copy["map_image_path"] = map_path + + updated = parsed_data.copy() + updated["msg"] = msg_copy + return updated + + def _handle_compressed_image(data_b64: str, result: dict) -> dict | None: """Handle compressed image data (JPEG/PNG already encoded).""" path = "./camera/received_image_compressed.jpeg" @@ -211,7 +285,9 @@ def _decode_image_data( def parse_input( - raw: Union[str, bytes] | None, expects_image: bool | None = None + raw: Union[str, bytes] | None, + expects_image: bool | None = None, + msg_type: str | None = None, ) -> tuple[dict | None, bool]: """ Parse input data with optional image hint for optimized handling. @@ -239,6 +315,13 @@ def parse_input( if parsed_data is None: return None, False + if msg_type and msg_type.endswith("OccupancyGrid"): + # Try dedicated map handling (convert to image, drop data) + mapped = _handle_map_hint(parsed_data) + if mapped is not None: + # Map was processed successfully; no image parsing involved + return mapped, False + # 3. Handle explicit hints if expects_image is True: return _handle_image_hint(raw, parsed_data) @@ -248,6 +331,22 @@ def parse_input( return _handle_auto_detection(raw, parsed_data) +def _handle_map_hint(parsed_data: dict) -> dict | None: + """ + Handle OccupancyGrid-like messages by converting the map data into a PNG image + and stripping the large 'data' field from the message. + + Args: + parsed_data (dict): Parsed JSON data from rosbridge. + + Returns: + dict | None: Updated parsed data with map image path, + or None if parsing as a map failed. + """ + mapped = parse_map(parsed_data) + return mapped + + def _handle_image_hint(raw: Union[str, bytes], parsed_data: dict) -> tuple[dict | None, bool]: """Handle explicit image hint - try image parsing first.""" print("[Input] Hinted to parse as image", file=sys.stderr)