From 95de732e55c57dc910d80d25035ba9ac4e7b5cec Mon Sep 17 00:00:00 2001 From: CarolinePascal Date: Wed, 4 Jun 2025 14:33:59 +0200 Subject: [PATCH] [skip-ci] feat(multiprocess audio stream): running input audio stream in a separate process to avoid extensive CPU usage impacts --- .../portaudio/microphone_portaudio.py | 232 ++++++++++++------ src/lerobot/microphones/utils.py | 5 +- 2 files changed, 163 insertions(+), 74 deletions(-) diff --git a/src/lerobot/microphones/portaudio/microphone_portaudio.py b/src/lerobot/microphones/portaudio/microphone_portaudio.py index 3c10ece67..a601a7d04 100644 --- a/src/lerobot/microphones/portaudio/microphone_portaudio.py +++ b/src/lerobot/microphones/portaudio/microphone_portaudio.py @@ -20,8 +20,8 @@ import logging import time from multiprocessing import Event as process_Event, JoinableQueue as process_Queue, Process from pathlib import Path -from queue import Empty, Queue as thread_Queue -from threading import Event, Event as thread_Event, Thread +from queue import Empty +from threading import Barrier, Event, Event as thread_Event, Thread from typing import Any import numpy as np @@ -56,9 +56,7 @@ class PortAudioMicrophone(Microphone): microphone.connect() microphone.start_recording("some/output/file.wav") ... - audio_readings = ( - microphone.read() - ) # Gets all recorded audio data since the last read or since the beginning of the recording + audio_readings = microphone.read() # Gets all recorded audio data since the last read or since the beginning of the recording. The longer the period the longer the reading time ! ... microphone.stop_recording() microphone.disconnect() @@ -77,8 +75,12 @@ class PortAudioMicrophone(Microphone): # Microphone index self.microphone_index = config.microphone_index - # Input audio stream - self.stream = None + # Input audio stream process and events + self.stream_process = None + self.stream_stop_event = process_Event() + self.stream_start_event = process_Event() + self.stream_close_event = process_Event() + self.stream_is_started_event = process_Event() # Thread/Process-safe concurrent queue to store the recorded/read audio self.record_queue = None @@ -164,55 +166,54 @@ class PortAudioMicrophone(Microphone): # Get channels index instead of number for slicing self.channels_index = np.array(self.channels) - 1 - # Create the audio stream - self.stream = sd.InputStream( - device=self.microphone_index, - samplerate=self.sample_rate, - channels=max(self.channels), - dtype="float32", - blocksize=0, # Varying input buffer length, but no additional latency - latency="low", # Low latency mode (not enabled by default !) - # never_drop_input=True, # Disabled as it generates an error for some devices - callback=self._audio_callback, + # Create queues + self.record_queue = process_Queue() + self.read_queue = process_Queue() + + # Reset events + self.stream_start_event.clear() + self.stream_stop_event.clear() + self.stream_close_event.clear() + self.stream_is_started_event.clear() + + # Create and run audio input stream process + # Remark: this is done in a separate process so that audio recording is not impacted by the main thread CPU usage, especially the busy_wait function. + self.stream_process = Process( + target=self._run_audio_input_stream, + args=( + self.microphone_index, + self.sample_rate, + self.channels, + self.stream_start_event, + self.stream_stop_event, + self.stream_close_event, + self.stream_is_started_event, + self.record_queue, + self.read_queue, + ), ) + self.stream_process.daemon = True + self.stream_process.start() self._is_connected = True - def _audio_callback(self, indata, frames, timestamp, status) -> None: + def disconnect(self) -> None: """ - Low-level sounddevice callback. + Disconnects the microphone and stops the recording. """ - if status: - logging.warning(status) - # Slicing makes copy unnecessary - # Two separate queues are necessary because .get() also pops the data from the queue - # Remark: this also ensures that file-recorded data and chunk-audio data are the same. - if self.is_writing: - self.record_queue.put_nowait(indata[:, self.channels_index]) - self.read_queue.put_nowait(indata[:, self.channels_index]) + if not self.is_connected: + raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") - @staticmethod - def _record_loop(queue, event: Event, sample_rate: int, channels: list[int], output_file: Path) -> None: - """ - Thread/Process-safe loop to write audio data into a file. - """ - # Can only be run on a single process/thread for file writing safety - with SoundFile( - output_file, - mode="w", - samplerate=sample_rate, - channels=max(channels), - format="WAV", - subtype="FLOAT", # By default, a much lower quality WAV file is created ! - ) as file: - while not event.is_set(): - try: - file.write( - queue.get(timeout=0.005) - ) # Timeout set as the usual sounddevice buffer size. get_nowait is not possible here as it saturates the thread. - queue.task_done() - except Empty: - continue + if self.is_recording: + self.stop_recording() + + if self.stream_process is not None: + self.stream_close_event.set() + self.read_queue = None + self.record_queue = None + self.stream_process.terminate() # No time to wait + self.stream_process = None + self.is_connected = False def _read(self) -> np.ndarray: """ @@ -226,8 +227,6 @@ class PortAudioMicrophone(Microphone): except Empty: break - self.read_queue = thread_Queue() - return audio_readings def read(self) -> np.ndarray: @@ -251,11 +250,67 @@ class PortAudioMicrophone(Microphone): return audio_readings + @staticmethod + def _run_audio_input_stream( + microphone_index, + sample_rate, + channels, + stream_start_event, + stream_stop_event, + stream_close_event, + is_started_event, + record_queue, + read_queue, + ) -> None: + """ + Process callback used to create an unpickable sounddevice audio input stream and start, stop and close it based on multiprocessing events. + """ + + channels_index = np.array(channels) - 1 + + def audio_callback(indata, frames, timestamp, status) -> None: + """ + Low-level sounddevice callback. + """ + if status: + logging.warning(status) + # Slicing makes copy unnecessary + # Two separate queues are necessary because .get() also pops the data from the queue + # Remark: this also ensures that file-recorded data and chunk-audio data are the same. + record_queue.put_nowait(indata[:, channels_index]) + read_queue.put_nowait(indata[:, channels_index]) + + # Create the audio stream + stream = sd.InputStream( + device=microphone_index, + samplerate=sample_rate, + channels=max(channels), + dtype="float32", + blocksize=0, # Varying input buffer length, but no additional latency + latency="low", # Low latency mode (not enabled by default !) + # never_drop_input=True, # Disabled as it generates an error for some devices + callback=audio_callback, + ) + + while True: + start_flag = stream_start_event.wait(timeout=1.0) + if stream_close_event.is_set(): + break + elif not start_flag: + continue + stream.start() + is_started_event.set() + stream_stop_event.wait() + stream.stop() # stream.stop() waits for all buffers to be processed + # Remark : stream.abort() flushes the buffers ! + stream.close() + def start_recording( self, output_file: str | None = None, multiprocessing: bool | None = False, overwrite: bool | None = True, + barrier: Barrier | None = None, ) -> None: """ Starts the recording of the microphone. If output_file is provided, the audio will be written to this file. @@ -267,11 +322,12 @@ class PortAudioMicrophone(Microphone): raise DeviceAlreadyRecordingError(f"Microphone {self.microphone_index} is already recording.") # Reset queues - self.read_queue = thread_Queue() - if multiprocessing: - self.record_queue = process_Queue() - else: - self.record_queue = thread_Queue() + self._clear_queue(self.read_queue) + self._clear_queue(self.record_queue) + + # Reset events - stream_start_event is already cleared here + self.stream_stop_event.clear() + self.stream_is_started_event.clear() # Write recordings into a file if output_file is provided if output_file is not None: @@ -311,12 +367,22 @@ class PortAudioMicrophone(Microphone): ), ) self.record_thread.daemon = True - self.record_thread.start() - self.is_writing = True + if barrier is None: + self.record_thread.start() + + self.stream_start_event.set() # Start the input audio stream process + self.stream_is_started_event.wait() # Wait for the input audio stream process to be actually started + + if barrier is not None: + barrier.wait() # Wait for multiple input audio streams to be started at the same time + + self._clear_queue(self.read_queue) + self._clear_queue(self.record_queue) + if output_file is not None: + self.record_thread.start() self.is_recording = True - self.stream.start() def stop_recording(self) -> None: """ @@ -327,9 +393,9 @@ class PortAudioMicrophone(Microphone): if not self.is_recording: raise DeviceNotRecordingError(f"Microphone {self.microphone_index} is not recording.") - if self.stream.active: - self.stream.stop() # Wait for all buffers to be processed - # Remark : stream.abort() flushes the buffers ! + if self.stream_process is not None: + self.stream_start_event.clear() # Ensures the stream is not started again ! + self.stream_stop_event.set() self.is_recording = False if self.record_thread is not None: @@ -340,17 +406,37 @@ class PortAudioMicrophone(Microphone): self.record_stop_event = None self.is_writing = False - self.logs["stop_timestamp"] = capture_timestamp_utc() - - def disconnect(self) -> None: + @staticmethod + def _record_loop(queue, event: Event, sample_rate: int, channels: list[int], output_file: Path) -> None: """ - Disconnects the microphone and stops the recording. + Thread/Process-safe loop to write audio data into a file. """ - if not self.is_connected: - raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") + # Can only be run on a single process/thread for file writing safety + with SoundFile( + output_file, + mode="w", + samplerate=sample_rate, + channels=len(channels), + format="WAV", + subtype="FLOAT", # By default, a much lower quality WAV file is created ! + ) as file: + while not event.is_set(): + try: + file.write( + queue.get(timeout=0.005) + ) # Timeout set as the usual sounddevice buffer size. get_nowait is not possible here as it saturates the thread. + queue.task_done() + except Empty: + continue - if self.is_recording: - self.stop_recording() - - self.stream.close() - self._is_connected = False + @staticmethod + def _clear_queue(queue): + """ + Clears the queue by getting all items until it is empty. The longer the queue, the longer it takes to clear it. + """ + try: + while True: + queue.get_nowait() + queue.task_done() + except Empty: + return diff --git a/src/lerobot/microphones/utils.py b/src/lerobot/microphones/utils.py index 6613d0976..bf79041bd 100644 --- a/src/lerobot/microphones/utils.py +++ b/src/lerobot/microphones/utils.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from multiprocessing import Barrier from queue import Queue from threading import Thread @@ -49,9 +50,11 @@ def async_microphones_start_recording( if output_files is None: output_files = [None] * len(microphones) + barrier = Barrier(len(microphones)) + for microphone, output_file in zip(microphones.values(), output_files, strict=False): start_recording_threads.append( - Thread(target=microphone.start_recording, args=(output_file, multiprocessing, overwrite)) + Thread(target=microphone.start_recording, args=(output_file, multiprocessing, overwrite, barrier)) ) for thread in start_recording_threads: