From f28086ff81f9a07320ac80dfd9aa1ae916034797 Mon Sep 17 00:00:00 2001 From: Steven Palma Date: Fri, 9 May 2025 13:56:59 +0200 Subject: [PATCH] refactor(cameras): improvements utils functionalities v0.2 --- .../common/cameras/intel/camera_realsense.py | 35 +- .../cameras/intel/configuration_realsense.py | 8 - .../common/cameras/opencv/camera_opencv.py | 29 +- lerobot/find_cameras.py | 307 +++++++++++------- 4 files changed, 227 insertions(+), 152 deletions(-) diff --git a/lerobot/common/cameras/intel/camera_realsense.py b/lerobot/common/cameras/intel/camera_realsense.py index b4b7eaae5..e5ec0966c 100644 --- a/lerobot/common/cameras/intel/camera_realsense.py +++ b/lerobot/common/cameras/intel/camera_realsense.py @@ -22,7 +22,7 @@ import math import queue import time from threading import Event, Thread -from typing import Dict, List, Tuple, Union +from typing import Any, Dict, List, Tuple, Union import cv2 import numpy as np @@ -155,7 +155,7 @@ class RealSenseCamera(Camera): return self.rs_pipeline is not None and self.rs_profile is not None @staticmethod - def find_cameras(raise_when_empty: bool = True) -> List[Dict[str, Union[str, int, float]]]: + def find_cameras(raise_when_empty: bool = True) -> List[Dict[str, Any]]: """ Detects available Intel RealSense cameras connected to the system. @@ -163,13 +163,14 @@ class RealSenseCamera(Camera): raise_when_empty (bool): If True, raises an OSError if no cameras are found. Returns: - List[Dict[str, Union[str, int, float]]]: A list of dictionaries, - where each dictionary contains 'type', 'serial_number', 'name', - firmware version, USB type, and other available specs. + List[Dict[str, Any]]: A list of dictionaries, + where each dictionary contains 'type', 'id' (serial number), 'name', + firmware version, USB type, and other available specs, and the default profile properties (width, height, fps, format). Raises: OSError: If `raise_when_empty` is True and no cameras are detected, or if pyrealsense2 is not installed. + ImportError: If pyrealsense2 is not installed. """ found_cameras_info = [] context = rs.context() @@ -185,19 +186,37 @@ class RealSenseCamera(Camera): for device in devices: camera_info = { + "name": device.get_info(rs.camera_info.name), "type": "RealSense", - "serial_number": device.get_info(rs.camera_info.serial_number), + "id": device.get_info(rs.camera_info.serial_number), "firmware_version": device.get_info(rs.camera_info.firmware_version), "usb_type_descriptor": device.get_info(rs.camera_info.usb_type_descriptor), "physical_port": device.get_info(rs.camera_info.physical_port), "product_id": device.get_info(rs.camera_info.product_id), "product_line": device.get_info(rs.camera_info.product_line), - "name": device.get_info(rs.camera_info.name), } + + # Get stream profiles for each sensor + sensors = device.query_sensors() + for sensor in sensors: + profiles = sensor.get_stream_profiles() + + for profile in profiles: + if profile.is_video_stream_profile() and profile.is_default(): + vprofile = profile.as_video_stream_profile() + stream_info = { + "stream_type": vprofile.stream_name(), + "format": vprofile.format().name, + "width": vprofile.width(), + "height": vprofile.height(), + "fps": vprofile.fps(), + } + camera_info["default_stream_profile"] = stream_info + found_cameras_info.append(camera_info) logger.debug(f"Found RealSense camera: {camera_info}") - logger.info(f"Detected RealSense cameras: {[cam['serial_number'] for cam in found_cameras_info]}") + logger.info(f"Detected RealSense cameras: {[cam['id'] for cam in found_cameras_info]}") return found_cameras_info def _find_serial_number_from_name(self, name: str) -> str: diff --git a/lerobot/common/cameras/intel/configuration_realsense.py b/lerobot/common/cameras/intel/configuration_realsense.py index b11fcb8a9..7f8e94eec 100644 --- a/lerobot/common/cameras/intel/configuration_realsense.py +++ b/lerobot/common/cameras/intel/configuration_realsense.py @@ -69,11 +69,3 @@ class RealSenseCameraConfig(CameraConfig): raise ValueError( f"One of them must be set: name or serial_number, but {self.name=} and {self.serial_number=} provided." ) - - at_least_one_is_not_none = self.fps is not None or self.width is not None or self.height is not None - at_least_one_is_none = self.fps is None or self.width is None or self.height is None - if at_least_one_is_not_none and at_least_one_is_none: - raise ValueError( - "For `fps`, `width` and `height`, either all of them need to be set, or none of them, " - f"but {self.fps=}, {self.width=}, {self.height=} were provided." - ) diff --git a/lerobot/common/cameras/opencv/camera_opencv.py b/lerobot/common/cameras/opencv/camera_opencv.py index db2da39fe..7f2c2aa54 100644 --- a/lerobot/common/cameras/opencv/camera_opencv.py +++ b/lerobot/common/cameras/opencv/camera_opencv.py @@ -24,7 +24,7 @@ import queue import time from pathlib import Path from threading import Event, Thread -from typing import Dict, List, Union +from typing import Any, Dict, List import cv2 import numpy as np @@ -59,7 +59,7 @@ class OpenCVCamera(Camera): or port changes, especially on Linux. Use the provided utility script to find available camera indices or paths: ```bash - NOTE(Steven): Point to future util + python -m lerobot.find_cameras ``` The camera's default settings (FPS, resolution, color mode) are used unless @@ -132,7 +132,7 @@ class OpenCVCamera(Camera): self.logs: dict = {} # NOTE(Steven): Might be removed in the future self.rotation: int | None = get_cv2_rotation(config.rotation) - self.backend: int = get_cv2_backend() + self.backend: int = get_cv2_backend() # NOTE(Steven): If I specify backend the opencv open fails def __str__(self) -> str: """Returns a string representation of the camera instance.""" @@ -195,7 +195,7 @@ class OpenCVCamera(Camera): cv2.setNumThreads(1) logger.debug(f"Attempting to connect to camera {self.index_or_path} using backend {self.backend}...") - self.videocapture_camera = cv2.VideoCapture(self.index_or_path, self.backend) + self.videocapture_camera = cv2.VideoCapture(self.index_or_path) if not self.videocapture_camera.isOpened(): self.videocapture_camera.release() @@ -273,7 +273,7 @@ class OpenCVCamera(Camera): @staticmethod def find_cameras( max_index_search_range=MAX_OPENCV_INDEX, raise_when_empty: bool = True - ) -> List[Dict[str, Union[str, int, float]]]: + ) -> List[Dict[str, Any]]: """ Detects available OpenCV cameras connected to the system. @@ -285,9 +285,9 @@ class OpenCVCamera(Camera): raise_when_empty (bool): If True, raises an OSError if no cameras are found. Returns: - List[Dict[str, Union[str, int, float]]]: A list of dictionaries, + List[Dict[str, Any]]: A list of dictionaries, where each dictionary contains 'type', 'id' (port index or path), - 'default_width', 'default_height', and 'default_fps'. + and the default profile properties (width, height, fps, format). """ found_cameras_info = [] @@ -303,20 +303,25 @@ class OpenCVCamera(Camera): targets_to_scan = list(range(max_index_search_range)) for target in targets_to_scan: - camera = cv2.VideoCapture(target, get_cv2_backend()) + camera = cv2.VideoCapture(target) if camera.isOpened(): default_width = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH)) default_height = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) default_fps = camera.get(cv2.CAP_PROP_FPS) - + default_format = camera.get(cv2.CAP_PROP_FORMAT) camera_info = { + "name": f"OpenCV Camera @ {target}", "type": "OpenCV", "id": target, - "default_width": default_width, - "default_height": default_height, - "default_fps": default_fps, "backend_api": camera.getBackendName(), + "default_stream_profile": { + "format": default_format, + "width": default_width, + "height": default_height, + "fps": default_fps, + }, } + found_cameras_info.append(camera_info) logger.debug(f"Found OpenCV camera:: {camera_info}") camera.release() diff --git a/lerobot/find_cameras.py b/lerobot/find_cameras.py index 8d01a8fcb..de5c09a04 100644 --- a/lerobot/find_cameras.py +++ b/lerobot/find_cameras.py @@ -20,7 +20,7 @@ import logging import shutil import time from pathlib import Path -from typing import Dict, List, Union +from typing import Any, Dict, List, Optional, Union import numpy as np from PIL import Image @@ -32,21 +32,21 @@ from lerobot.common.cameras.opencv.camera_opencv import OpenCVCamera from lerobot.common.cameras.opencv.configuration_opencv import OpenCVCameraConfig logger = logging.getLogger(__name__) +# logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(module)s - %(message)s") -def find_all_opencv_cameras() -> List[Dict[str, Union[str, int, float, List[str], None]]]: +def find_all_opencv_cameras() -> List[Dict[str, Any]]: """ Finds all available OpenCV cameras plugged into the system. Returns: A list of all available OpenCV cameras with their metadata. """ - all_opencv_cameras_info: List[Dict[str, Union[str, int, float, List[str], None]]] = [] + all_opencv_cameras_info: List[Dict[str, Any]] = [] logger.info("Searching for OpenCV cameras...") try: opencv_cameras = OpenCVCamera.find_cameras(raise_when_empty=False) for cam_info in opencv_cameras: - cam_info.setdefault("name", f"OpenCV Camera @ {cam_info['id']}") all_opencv_cameras_info.append(cam_info) logger.info(f"Found {len(opencv_cameras)} OpenCV cameras.") except Exception as e: @@ -55,14 +55,14 @@ def find_all_opencv_cameras() -> List[Dict[str, Union[str, int, float, List[str] return all_opencv_cameras_info -def find_all_realsense_cameras() -> List[Dict[str, Union[str, int, float, List[str], None]]]: +def find_all_realsense_cameras() -> List[Dict[str, Any]]: """ Finds all available RealSense cameras plugged into the system. Returns: A list of all available RealSense cameras with their metadata. """ - all_realsense_cameras_info: List[Dict[str, Union[str, int, float, List[str], None]]] = [] + all_realsense_cameras_info: List[Dict[str, Any]] = [] logger.info("Searching for RealSense cameras...") try: realsense_cameras = RealSenseCamera.find_cameras(raise_when_empty=False) @@ -77,29 +77,44 @@ def find_all_realsense_cameras() -> List[Dict[str, Union[str, int, float, List[s return all_realsense_cameras_info -def find_all_cameras() -> List[Dict[str, Union[str, int, float, List[str], None]]]: +def find_and_print_cameras(camera_type_filter: Optional[str] = None) -> List[Dict[str, Any]]: """ - Finds all available cameras (OpenCV and RealSense) plugged into the system. + Finds available cameras based on an optional filter and prints their information. + + Args: + camera_type_filter: Optional string to filter cameras ("realsense" or "opencv"). + If None, lists all cameras. Returns: - A unified list of all available cameras with their metadata. + A list of all available cameras matching the filter, with their metadata. """ + all_cameras_info: List[Dict[str, Any]] = [] - all_opencv_cameras_info = find_all_opencv_cameras() - all_realsense_cameras_info = find_all_realsense_cameras() + if camera_type_filter: + camera_type_filter = camera_type_filter.lower() - all_cameras_info = all_opencv_cameras_info + all_realsense_cameras_info + if camera_type_filter is None or camera_type_filter == "opencv": + all_cameras_info.extend(find_all_opencv_cameras()) + if camera_type_filter is None or camera_type_filter == "realsense": + all_cameras_info.extend(find_all_realsense_cameras()) if not all_cameras_info: - logger.warning("No cameras (OpenCV or RealSense) were detected.") + if camera_type_filter: + logger.warning(f"No {camera_type_filter} cameras were detected.") + else: + logger.warning("No cameras (OpenCV or RealSense) were detected.") else: print("\n--- Detected Cameras ---") for i, cam_info in enumerate(all_cameras_info): print(f"Camera #{i + 1}:") for key, value in cam_info.items(): - print(f" {key.replace('_', ' ').capitalize()}: {value}") + if key == "default_stream_profile" and isinstance(value, dict): + print(f" {key.replace('_', ' ').capitalize()}:") + for sub_key, sub_value in value.items(): + print(f" {sub_key.capitalize()}: {sub_value}") + else: + print(f" {key.replace('_', ' ').capitalize()}: {value}") print("-" * 20) - return all_cameras_info @@ -127,176 +142,220 @@ def save_image( logger.error(f"Failed to save image for camera {camera_identifier} (type {camera_type}): {e}") -def save_images_from_all_cameras( - output_dir: Union[str, Path], - width: int = 640, - height: int = 480, - record_time_s: int = 2, -): - """ - Connects to all detected cameras and saves a few images from each. - - Args: - output_dir: Directory to save images. - width: Target width. - height: Target height. - record_time_s: Duration in seconds to record images. - """ +def initialize_output_directory(output_dir: Union[str, Path]) -> Path: + """Initialize and clean the output directory.""" output_dir = Path(output_dir) if output_dir.exists(): logger.info(f"Output directory {output_dir} exists. Removing previous content.") shutil.rmtree(output_dir) output_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Saving images to {output_dir}") + return output_dir + + +def create_camera_instance(cam_meta: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Create and connect to a camera instance based on metadata.""" + cam_type = cam_meta.get("type") + cam_id = cam_meta.get("id") + default_profile = cam_meta.get("default_stream_profile") + width = default_profile.get("width") + height = default_profile.get("height") + instance = None + + logger.info(f"Preparing {cam_type} ID {cam_id} with profile: Width={width}, Height={height}") + + try: + if cam_type == "OpenCV": + cv_config = OpenCVCameraConfig( + index_or_path=cam_id, + color_mode=ColorMode.RGB, + width=width, + height=height, + ) + instance = OpenCVCamera(cv_config) + elif cam_type == "RealSense": + rs_config = RealSenseCameraConfig( + serial_number=str(cam_id), + color_mode=ColorMode.RGB, + width=width, + height=height, + ) + instance = RealSenseCamera(rs_config) + else: + logger.warning(f"Unknown camera type: {cam_type} for ID {cam_id}. Skipping.") + return None + + if instance: + logger.info(f"Connecting to {cam_type} camera: {cam_id}...") + instance.connect() + return {"instance": instance, "meta": cam_meta} + except Exception as e: + logger.error(f"Failed to connect or configure {cam_type} camera {cam_id}: {e}") + if instance and instance.is_connected: + instance.disconnect() + return None + + +def process_camera_image( + cam_dict: Dict[str, Any], output_dir: Path, current_time: float +) -> Optional[concurrent.futures.Future]: + """Capture and process an image from a single camera.""" + cam = cam_dict["instance"] + meta = cam_dict["meta"] + cam_type_str = str(meta.get("type", "unknown")) + cam_id_str = str(meta.get("id", "unknown")) + + try: + image_data = cam.read() + + return save_image( + image_data, + cam_id_str, + output_dir, + cam_type_str, + ) + except TimeoutError: + logger.warning( + f"Timeout reading from {cam_type_str} camera {cam_id_str} at time {current_time:.2f}s." + ) + except Exception as e: + logger.error(f"Error reading from {cam_type_str} camera {cam_id_str}: {e}") + return None + + +def cleanup_cameras(cameras_to_use: List[Dict[str, Any]]): + """Disconnect all cameras.""" + logger.info(f"Disconnecting {len(cameras_to_use)} cameras...") + for cam_dict in cameras_to_use: + try: + if cam_dict["instance"] and cam_dict["instance"].is_connected: + cam_dict["instance"].disconnect() + except Exception as e: + logger.error(f"Error disconnecting camera {cam_dict['meta'].get('id')}: {e}") + + +def save_images_from_all_cameras( + output_dir: Union[str, Path], + record_time_s: float = 2.0, + camera_type_filter: Optional[str] = None, +): + """ + Connects to detected cameras (optionally filtered by type) and saves images from each. + Uses default stream profiles for width, height, and FPS. + + Args: + output_dir: Directory to save images. + record_time_s: Duration in seconds to record images. + camera_type_filter: Optional string to filter cameras ("realsense" or "opencv"). + If None, uses all detected cameras. + """ + output_dir = initialize_output_directory(output_dir) + all_camera_metadata = find_and_print_cameras(camera_type_filter=camera_type_filter) - all_camera_metadata = find_all_cameras() if not all_camera_metadata: - logger.warning("No cameras detected. Cannot save images.") + logger.warning("No cameras detected matching the criteria. Cannot save images.") return + # Create and connect to all cameras cameras_to_use = [] for cam_meta in all_camera_metadata: - cam_type = cam_meta.get("type") - cam_id = cam_meta.get("id") - instance = None - - try: - if cam_type == "OpenCV": - cv_config = OpenCVCameraConfig( - index_or_path=cam_id, color_mode=ColorMode.RGB, width=width, height=height, fps=30 - ) - instance = OpenCVCamera(cv_config) - elif cam_type == "RealSense": - rs_config = RealSenseCameraConfig( - serial_number=str(cam_id), width=width, height=height, fps=30 - ) - instance = RealSenseCamera(rs_config) - else: - logger.warning(f"Unknown camera type: {cam_type} for ID {cam_id}. Skipping.") - continue - - if instance: - logger.info(f"Connecting to {cam_type} camera: {cam_id}...") - instance.connect() - cameras_to_use.append({"instance": instance, "meta": cam_meta}) - except Exception as e: - logger.error(f"Failed to connect or configure {cam_type} camera {cam_id}: {e}") - if instance and instance.is_connected: - instance.disconnect() + camera_instance = create_camera_instance(cam_meta) + if camera_instance: + cameras_to_use.append(camera_instance) if not cameras_to_use: logger.warning("No cameras could be connected. Aborting image save.") return logger.info(f"Starting image capture for {record_time_s} seconds from {len(cameras_to_use)} cameras.") - frame_index = 0 start_time = time.perf_counter() with concurrent.futures.ThreadPoolExecutor(max_workers=len(cameras_to_use) * 2) as executor: try: while time.perf_counter() - start_time < record_time_s: futures = [] + current_capture_time = time.perf_counter() for cam_dict in cameras_to_use: - cam = cam_dict["instance"] - meta = cam_dict["meta"] - cam_type_str = str(meta.get("type", "unknown")) - cam_id_str = str(meta.get("id", "unknown")) + future = process_camera_image(cam_dict, output_dir, current_capture_time) + if future: + futures.append(future) - try: - image_data = cam.read() - - if image_data is None: - logger.warning( - f"No frame received from {cam_type_str} camera {cam_id_str} for frame {frame_index}." - ) - continue - - futures.append( - executor.submit( - save_image, - image_data, - cam_id_str, - output_dir, - cam_type_str, - ) - ) - - except TimeoutError: - logger.warning( - f"Timeout reading from {cam_type_str} camera {cam_id_str} for frame {frame_index}." - ) - except Exception as e: - logger.error(f"Error reading from {cam_type_str} camera {cam_id_str}: {e}") - - concurrent.futures.wait(futures) + if futures: + concurrent.futures.wait(futures) except KeyboardInterrupt: logger.info("Capture interrupted by user.") finally: print("\nFinalizing image saving...") executor.shutdown(wait=True) - logger.info(f"Disconnecting {len(cameras_to_use)} cameras...") - for cam_dict in cameras_to_use: - try: - if cam_dict["instance"] and cam_dict["instance"].is_connected: - cam_dict["instance"].disconnect() - except Exception as e: - logger.error(f"Error disconnecting camera {cam_dict['meta'].get('id')}: {e}") + cleanup_cameras(cameras_to_use) logger.info(f"Image capture finished. Images saved to {output_dir}") -# NOTE(Steven): Add CLI for finding-cameras of just one type -# NOTE(Steven): Check why opencv detects realsense cameras -# NOTE(Steven): Check why saving cameras is buggy -# NOTE(Steven): Check how to deal with different resolutions macos -# NOTE(Steven): Ditch width height resolutions in favor of defaults if __name__ == "__main__": parser = argparse.ArgumentParser( description="Unified camera utility script for listing cameras and capturing images." ) - subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands") + subparsers = parser.add_subparsers(dest="command", help="Available commands") # List cameras command list_parser = subparsers.add_parser( - "list-cameras", help="Shows all connected cameras (OpenCV and RealSense)" + "list-cameras", help="Shows connected cameras. Optionally filter by type (realsense or opencv)." ) - list_parser.set_defaults(func=lambda args: find_all_cameras()) + list_parser.add_argument( + "camera_type", + type=str, + nargs="?", + default=None, + choices=["realsense", "opencv"], + help="Specify camera type to list (e.g., 'realsense', 'opencv'). Lists all if omitted.", + ) + list_parser.set_defaults(func=lambda args: find_and_print_cameras(args.camera_type)) # Capture images command - capture_parser = subparsers.add_parser("capture-images", help="Saves images from all detected cameras") + capture_parser = subparsers.add_parser( + "capture-images", + help="Saves images from detected cameras (optionally filtered by type) using their default stream profiles.", + ) + capture_parser.add_argument( + "camera_type", + type=str, + nargs="?", + default=None, + choices=["realsense", "opencv"], + help="Specify camera type to capture from (e.g., 'realsense', 'opencv'). Captures from all if omitted.", + ) capture_parser.add_argument( "--output-dir", type=Path, default="outputs/captured_images", help="Directory to save images. Default: outputs/captured_images", ) - capture_parser.add_argument( - "--width", - type=int, - default=1920, - help="Set the capture width for all cameras. If not provided, uses camera defaults.", - ) - capture_parser.add_argument( - "--height", - type=int, - default=1080, - help="Set the capture height for all cameras. If not provided, uses camera defaults.", - ) capture_parser.add_argument( "--record-time-s", type=float, - default=10.0, - help="Set the number of seconds to record frames. Default: 2.0 seconds.", + default=5.0, + help="Time duration to attempt capturing frames. Default: 0.5 seconds (usually enough for one frame).", ) capture_parser.set_defaults( func=lambda args: save_images_from_all_cameras( output_dir=args.output_dir, - width=args.width, - height=args.height, record_time_s=args.record_time_s, + camera_type_filter=args.camera_type, ) ) args = parser.parse_args() - args.func(args) + + if args.command is None: + default_output_dir = capture_parser.get_default("output_dir") + default_record_time_s = capture_parser.get_default("record_time_s") + + save_images_from_all_cameras( + output_dir=default_output_dir, + record_time_s=default_record_time_s, + camera_type_filter=None, + ) + else: + args.func(args)