From eaeff789248f66e42ca46de1013eec8d43d098d4 Mon Sep 17 00:00:00 2001 From: CarolinePascal Date: Wed, 6 Aug 2025 19:46:22 +0200 Subject: [PATCH] style(names): renaming attributes names for better clarity --- .../portaudio/microphone_portaudio.py | 154 +++++++++--------- 1 file changed, 77 insertions(+), 77 deletions(-) diff --git a/src/lerobot/microphones/portaudio/microphone_portaudio.py b/src/lerobot/microphones/portaudio/microphone_portaudio.py index 2fe056898..7fe1de635 100644 --- a/src/lerobot/microphones/portaudio/microphone_portaudio.py +++ b/src/lerobot/microphones/portaudio/microphone_portaudio.py @@ -75,20 +75,20 @@ class PortAudioMicrophone(Microphone): # Microphone index self.microphone_index = config.microphone_index - # Input audio stream process and events - self.stream_process = None - self.stream_stop_event = process_Event() - self.stream_start_event = process_Event() - self.stream_close_event = process_Event() - self.stream_is_started_event = process_Event() + # Input audio recording process and events + self.record_process = None + self.record_stop_event = process_Event() + self.record_start_event = process_Event() + self.record_close_event = process_Event() + self.record_is_started_event = process_Event() - # Thread/Process-safe concurrent queue to store the recorded/read audio - self.record_queue = None - self.read_queue = None + # Process-safe concurrent queues to store the written/read audio + self.write_queue = process_Queue() + self.read_queue = process_Queue() - # Thread/Process to handle data reading and file writing in a separate thread/process (safely) - self.record_thread = None - self.record_stop_event = None + # Thread/Process to handle data writing in a separate thread/process (safely) + self.write_thread = None + self.write_stop_event = None self.logs = {} self._is_connected = False @@ -199,34 +199,35 @@ class PortAudioMicrophone(Microphone): raise DeviceAlreadyConnectedError(f"Microphone {self.microphone_index} is already connected.") self._configure_capture_settings() - # Create queues - self.record_queue = process_Queue() + + # Create or reset queues + self.write_queue = process_Queue() self.read_queue = process_Queue() # Reset events - self.stream_start_event.clear() - self.stream_stop_event.clear() - self.stream_close_event.clear() - self.stream_is_started_event.clear() + self.record_start_event.clear() + self.record_stop_event.clear() + self.record_close_event.clear() + self.record_is_started_event.clear() - # Create and run audio input stream process + # Create and start an audio input stream with a recording callback # Remark: this is done in a separate process so that audio recording is not impacted by the main thread CPU usage, especially the busy_wait function. - self.stream_process = Process( - target=self._run_audio_input_stream, + self.record_process = Process( + target=self._record_process, args=( self.microphone_index, self.sample_rate, self.channels, - self.stream_start_event, - self.stream_stop_event, - self.stream_close_event, - self.stream_is_started_event, - self.record_queue, + self.record_start_event, + self.record_stop_event, + self.record_close_event, + self.record_is_started_event, + self.write_queue, self.read_queue, ), ) - self.stream_process.daemon = True - self.stream_process.start() + self.record_process.daemon = True + self.record_process.start() self._is_connected = True @@ -240,12 +241,12 @@ class PortAudioMicrophone(Microphone): if self.is_recording: self.stop_recording() - if self.stream_process is not None: - self.stream_close_event.set() + if self.record_process is not None: + self.record_close_event.set() self.read_queue = None - self.record_queue = None - self.stream_process.terminate() # No time to wait - self.stream_process = None + self.write_queue = None + self.record_process.terminate() # No time to wait + self.record_process = None self.is_connected = False def _read(self) -> np.ndarray: @@ -268,7 +269,7 @@ class PortAudioMicrophone(Microphone): """ if not self.is_connected: raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") - if not self.stream.active: + if not self.is_recording: raise RuntimeError(f"Microphone {self.microphone_index} is not recording.") start_time = time.perf_counter() @@ -284,19 +285,19 @@ class PortAudioMicrophone(Microphone): return audio_readings @staticmethod - def _run_audio_input_stream( + def _record_process( microphone_index, sample_rate, channels, - stream_start_event, - stream_stop_event, - stream_close_event, - is_started_event, - record_queue, + record_start_event, + record_stop_event, + record_close_event, + record_is_started_event, + write_queue, read_queue, ) -> None: """ - Process callback used to create an unpickable sounddevice audio input stream and start, stop and close it based on multiprocessing events. + Process callback used to create an unpickable sounddevice audio input stream with a recording callback and start, stop and close it based on multiprocessing events. """ channels_index = np.array(channels) - 1 @@ -310,7 +311,7 @@ class PortAudioMicrophone(Microphone): # Slicing makes copy unnecessary # Two separate queues are necessary because .get() also pops the data from the queue # Remark: this also ensures that file-recorded data and chunk-audio data are the same. - record_queue.put_nowait(indata[:, channels_index]) + write_queue.put_nowait(indata[:, channels_index]) read_queue.put_nowait(indata[:, channels_index]) # Create the audio stream @@ -326,15 +327,16 @@ class PortAudioMicrophone(Microphone): ) while True: - start_flag = stream_start_event.wait(timeout=1.0) - if stream_close_event.is_set(): + start_flag = record_start_event.wait(timeout=0.1) + if record_close_event.is_set(): break elif not start_flag: continue stream.start() - is_started_event.set() - stream_stop_event.wait() + record_is_started_event.set() + record_stop_event.wait() stream.stop() # stream.stop() waits for all buffers to be processed + record_is_started_event.clear() # Remark : stream.abort() flushes the buffers ! stream.close() @@ -347,7 +349,6 @@ class PortAudioMicrophone(Microphone): ) -> None: """ Starts the recording of the microphone. If output_file is provided, the audio will be written to this file. - Remark: multiprocessing is implemented, but does not work well with sounddevice (launching delays, tricky memory sharing, sounddevice streams are not picklable (even with dill #pathos), etc.). """ if not self.is_connected: raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") @@ -356,11 +357,10 @@ class PortAudioMicrophone(Microphone): # Reset queues self._clear_queue(self.read_queue) - self._clear_queue(self.record_queue) + self._clear_queue(self.write_queue) - # Reset events - stream_start_event is already cleared here - self.stream_stop_event.clear() - self.stream_is_started_event.clear() + # Reset stop event + self.record_stop_event.clear() # Write recordings into a file if output_file is provided if output_file is not None: @@ -376,44 +376,44 @@ class PortAudioMicrophone(Microphone): ) if multiprocessing: - self.record_stop_event = process_Event() - self.record_thread = Process( - target=PortAudioMicrophone._record_loop, + self.write_stop_event = process_Event() + self.write_thread = Process( + target=PortAudioMicrophone._write_loop, args=( - self.record_queue, - self.record_stop_event, + self.write_queue, + self.write_stop_event, self.sample_rate, self.channels, output_file, ), ) else: - self.record_stop_event = thread_Event() - self.record_thread = Thread( - target=PortAudioMicrophone._record_loop, + self.write_stop_event = thread_Event() + self.write_thread = Thread( + target=PortAudioMicrophone._write_loop, args=( - self.record_queue, - self.record_stop_event, + self.write_queue, + self.write_stop_event, self.sample_rate, self.channels, output_file, ), ) - self.record_thread.daemon = True + self.write_thread.daemon = True self.is_writing = True if barrier is None: - self.record_thread.start() + self.write_thread.start() - self.stream_start_event.set() # Start the input audio stream process - self.stream_is_started_event.wait() # Wait for the input audio stream process to be actually started + self.record_start_event.set() # Start the input audio stream process + self.record_is_started_event.wait() # Wait for the input audio stream process to be actually started if barrier is not None: barrier.wait() # Wait for multiple input audio streams to be started at the same time self._clear_queue(self.read_queue) - self._clear_queue(self.record_queue) + self._clear_queue(self.write_queue) if output_file is not None: - self.record_thread.start() + self.write_thread.start() self.is_recording = True @@ -426,21 +426,21 @@ class PortAudioMicrophone(Microphone): if not self.is_recording: raise DeviceNotRecordingError(f"Microphone {self.microphone_index} is not recording.") - if self.stream_process is not None: - self.stream_start_event.clear() # Ensures the stream is not started again ! - self.stream_stop_event.set() + if self.record_process is not None: + self.record_start_event.clear() # Ensures the stream is not started again ! + self.record_stop_event.set() self.is_recording = False - if self.record_thread is not None: - self.record_queue.join() - self.record_stop_event.set() - self.record_thread.join() - self.record_thread = None - self.record_stop_event = None + if self.write_thread is not None: + self.write_queue.join() + self.write_stop_event.set() + self.write_thread.join() + self.write_thread = None + self.write_stop_event = None self.is_writing = False @staticmethod - def _record_loop(queue, event: Event, sample_rate: int, channels: list[int], output_file: Path) -> None: + def _write_loop(queue, event: Event, sample_rate: int, channels: list[int], output_file: Path) -> None: """ Thread/Process-safe loop to write audio data into a file. """