From 0d4922ce49dc68c1dfb5f1f983ca07cc4babba27 Mon Sep 17 00:00:00 2001 From: CarolinePascal Date: Wed, 6 Aug 2025 19:55:20 +0200 Subject: [PATCH] refactor(properties): making microphones properties more robust and adding proper checks on state changes --- .../portaudio/microphone_portaudio.py | 66 ++++++++++++------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/src/lerobot/microphones/portaudio/microphone_portaudio.py b/src/lerobot/microphones/portaudio/microphone_portaudio.py index 7fe1de635..f9be60b53 100644 --- a/src/lerobot/microphones/portaudio/microphone_portaudio.py +++ b/src/lerobot/microphones/portaudio/microphone_portaudio.py @@ -91,13 +91,18 @@ class PortAudioMicrophone(Microphone): self.write_stop_event = None self.logs = {} - self._is_connected = False - self.is_recording = False - self.is_writing = False @property def is_connected(self) -> bool: - return self._is_connected + return self.record_process is not None and self.record_process.is_alive() + + @property + def is_recording(self) -> bool: + return self.record_is_started_event.is_set() + + @property + def is_writing(self) -> bool: + return self.write_thread is not None and self.write_thread.is_alive() @staticmethod def find_microphones() -> list[dict[str, Any]]: @@ -229,7 +234,9 @@ class PortAudioMicrophone(Microphone): self.record_process.daemon = True self.record_process.start() - self._is_connected = True + time.sleep(0.1) # Wait for the recording process to be started... + if not self.is_connected: + raise RuntimeError(f"Error connecting microphone {self.microphone_index}.") def disconnect(self) -> None: """ @@ -241,13 +248,13 @@ class PortAudioMicrophone(Microphone): if self.is_recording: self.stop_recording() - if self.record_process is not None: - self.record_close_event.set() - self.read_queue = None - self.write_queue = None - self.record_process.terminate() # No time to wait - self.record_process = None - self.is_connected = False + self.record_close_event.set() + self.read_queue.close() + self.write_queue.close() + self.record_process.join() + + if self.is_connected: + raise RuntimeError(f"Error disconnecting microphone {self.microphone_index}.") def _read(self) -> np.ndarray: """ @@ -258,6 +265,7 @@ class PortAudioMicrophone(Microphone): while True: try: audio_readings = np.concatenate((audio_readings, self.read_queue.get_nowait()), axis=0) + self.read_queue.task_done() except Empty: break @@ -400,7 +408,7 @@ class PortAudioMicrophone(Microphone): ), ) self.write_thread.daemon = True - self.is_writing = True + if barrier is None: self.write_thread.start() @@ -415,7 +423,10 @@ class PortAudioMicrophone(Microphone): if output_file is not None: self.write_thread.start() - self.is_recording = True + if not self.is_recording: + raise RuntimeError(f"Error starting recording for microphone {self.microphone_index}.") + if output_file is not None and not self.is_writing: + raise RuntimeError(f"Error starting writing for microphone {self.microphone_index}.") def stop_recording(self) -> None: """ @@ -426,18 +437,23 @@ class PortAudioMicrophone(Microphone): if not self.is_recording: raise DeviceNotRecordingError(f"Microphone {self.microphone_index} is not recording.") - if self.record_process is not None: - self.record_start_event.clear() # Ensures the stream is not started again ! - self.record_stop_event.set() - self.is_recording = False + self.record_start_event.clear() # Ensures the audio stream is not started again ! + self.record_stop_event.set() - if self.write_thread is not None: - self.write_queue.join() + while self.is_recording: + time.sleep(0.01) + + self._clear_queue(self.read_queue, join_queue=True) + self._clear_queue(self.write_queue, join_queue=True) + + if self.is_writing: self.write_stop_event.set() self.write_thread.join() - self.write_thread = None - self.write_stop_event = None - self.is_writing = False + + if self.is_recording: + raise RuntimeError(f"Error stopping recording for microphone {self.microphone_index}.") + if self.is_writing: + raise RuntimeError(f"Error stopping writing for microphone {self.microphone_index}.") @staticmethod def _write_loop(queue, event: Event, sample_rate: int, channels: list[int], output_file: Path) -> None: @@ -463,7 +479,7 @@ class PortAudioMicrophone(Microphone): continue @staticmethod - def _clear_queue(queue): + def _clear_queue(queue, join_queue: bool = False): """ Clears the queue by getting all items until it is empty. The longer the queue, the longer it takes to clear it. """ @@ -472,4 +488,6 @@ class PortAudioMicrophone(Microphone): queue.get_nowait() queue.task_done() except Empty: + if join_queue: + queue.join() return