[skip-ci] feat(multiprocess audio stream): running input audio stream in a separate process to avoid extensive CPU usage impacts

This commit is contained in:
CarolinePascal
2025-06-04 14:33:59 +02:00
parent b2383236ca
commit 95de732e55
2 changed files with 163 additions and 74 deletions

View File

@@ -20,8 +20,8 @@ import logging
import time
from multiprocessing import Event as process_Event, JoinableQueue as process_Queue, Process
from pathlib import Path
from queue import Empty, Queue as thread_Queue
from threading import Event, Event as thread_Event, Thread
from queue import Empty
from threading import Barrier, Event, Event as thread_Event, Thread
from typing import Any
import numpy as np
@@ -56,9 +56,7 @@ class PortAudioMicrophone(Microphone):
microphone.connect()
microphone.start_recording("some/output/file.wav")
...
audio_readings = (
microphone.read()
) # Gets all recorded audio data since the last read or since the beginning of the recording
audio_readings = microphone.read() # Gets all recorded audio data since the last read or since the beginning of the recording. The longer the period the longer the reading time !
...
microphone.stop_recording()
microphone.disconnect()
@@ -77,8 +75,12 @@ class PortAudioMicrophone(Microphone):
# Microphone index
self.microphone_index = config.microphone_index
# Input audio stream
self.stream = None
# Input audio stream process and events
self.stream_process = None
self.stream_stop_event = process_Event()
self.stream_start_event = process_Event()
self.stream_close_event = process_Event()
self.stream_is_started_event = process_Event()
# Thread/Process-safe concurrent queue to store the recorded/read audio
self.record_queue = None
@@ -164,55 +166,54 @@ class PortAudioMicrophone(Microphone):
# Get channels index instead of number for slicing
self.channels_index = np.array(self.channels) - 1
# Create the audio stream
self.stream = sd.InputStream(
device=self.microphone_index,
samplerate=self.sample_rate,
channels=max(self.channels),
dtype="float32",
blocksize=0, # Varying input buffer length, but no additional latency
latency="low", # Low latency mode (not enabled by default !)
# never_drop_input=True, # Disabled as it generates an error for some devices
callback=self._audio_callback,
# Create queues
self.record_queue = process_Queue()
self.read_queue = process_Queue()
# Reset events
self.stream_start_event.clear()
self.stream_stop_event.clear()
self.stream_close_event.clear()
self.stream_is_started_event.clear()
# Create and run audio input stream process
# Remark: this is done in a separate process so that audio recording is not impacted by the main thread CPU usage, especially the busy_wait function.
self.stream_process = Process(
target=self._run_audio_input_stream,
args=(
self.microphone_index,
self.sample_rate,
self.channels,
self.stream_start_event,
self.stream_stop_event,
self.stream_close_event,
self.stream_is_started_event,
self.record_queue,
self.read_queue,
),
)
self.stream_process.daemon = True
self.stream_process.start()
self._is_connected = True
def _audio_callback(self, indata, frames, timestamp, status) -> None:
def disconnect(self) -> None:
"""
Low-level sounddevice callback.
Disconnects the microphone and stops the recording.
"""
if status:
logging.warning(status)
# Slicing makes copy unnecessary
# Two separate queues are necessary because .get() also pops the data from the queue
# Remark: this also ensures that file-recorded data and chunk-audio data are the same.
if self.is_writing:
self.record_queue.put_nowait(indata[:, self.channels_index])
self.read_queue.put_nowait(indata[:, self.channels_index])
if not self.is_connected:
raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.")
@staticmethod
def _record_loop(queue, event: Event, sample_rate: int, channels: list[int], output_file: Path) -> None:
"""
Thread/Process-safe loop to write audio data into a file.
"""
# Can only be run on a single process/thread for file writing safety
with SoundFile(
output_file,
mode="w",
samplerate=sample_rate,
channels=max(channels),
format="WAV",
subtype="FLOAT", # By default, a much lower quality WAV file is created !
) as file:
while not event.is_set():
try:
file.write(
queue.get(timeout=0.005)
) # Timeout set as the usual sounddevice buffer size. get_nowait is not possible here as it saturates the thread.
queue.task_done()
except Empty:
continue
if self.is_recording:
self.stop_recording()
if self.stream_process is not None:
self.stream_close_event.set()
self.read_queue = None
self.record_queue = None
self.stream_process.terminate() # No time to wait
self.stream_process = None
self.is_connected = False
def _read(self) -> np.ndarray:
"""
@@ -226,8 +227,6 @@ class PortAudioMicrophone(Microphone):
except Empty:
break
self.read_queue = thread_Queue()
return audio_readings
def read(self) -> np.ndarray:
@@ -251,11 +250,67 @@ class PortAudioMicrophone(Microphone):
return audio_readings
@staticmethod
def _run_audio_input_stream(
microphone_index,
sample_rate,
channels,
stream_start_event,
stream_stop_event,
stream_close_event,
is_started_event,
record_queue,
read_queue,
) -> None:
"""
Process callback used to create an unpickable sounddevice audio input stream and start, stop and close it based on multiprocessing events.
"""
channels_index = np.array(channels) - 1
def audio_callback(indata, frames, timestamp, status) -> None:
"""
Low-level sounddevice callback.
"""
if status:
logging.warning(status)
# Slicing makes copy unnecessary
# Two separate queues are necessary because .get() also pops the data from the queue
# Remark: this also ensures that file-recorded data and chunk-audio data are the same.
record_queue.put_nowait(indata[:, channels_index])
read_queue.put_nowait(indata[:, channels_index])
# Create the audio stream
stream = sd.InputStream(
device=microphone_index,
samplerate=sample_rate,
channels=max(channels),
dtype="float32",
blocksize=0, # Varying input buffer length, but no additional latency
latency="low", # Low latency mode (not enabled by default !)
# never_drop_input=True, # Disabled as it generates an error for some devices
callback=audio_callback,
)
while True:
start_flag = stream_start_event.wait(timeout=1.0)
if stream_close_event.is_set():
break
elif not start_flag:
continue
stream.start()
is_started_event.set()
stream_stop_event.wait()
stream.stop() # stream.stop() waits for all buffers to be processed
# Remark : stream.abort() flushes the buffers !
stream.close()
def start_recording(
self,
output_file: str | None = None,
multiprocessing: bool | None = False,
overwrite: bool | None = True,
barrier: Barrier | None = None,
) -> None:
"""
Starts the recording of the microphone. If output_file is provided, the audio will be written to this file.
@@ -267,11 +322,12 @@ class PortAudioMicrophone(Microphone):
raise DeviceAlreadyRecordingError(f"Microphone {self.microphone_index} is already recording.")
# Reset queues
self.read_queue = thread_Queue()
if multiprocessing:
self.record_queue = process_Queue()
else:
self.record_queue = thread_Queue()
self._clear_queue(self.read_queue)
self._clear_queue(self.record_queue)
# Reset events - stream_start_event is already cleared here
self.stream_stop_event.clear()
self.stream_is_started_event.clear()
# Write recordings into a file if output_file is provided
if output_file is not None:
@@ -311,12 +367,22 @@ class PortAudioMicrophone(Microphone):
),
)
self.record_thread.daemon = True
self.record_thread.start()
self.is_writing = True
if barrier is None:
self.record_thread.start()
self.stream_start_event.set() # Start the input audio stream process
self.stream_is_started_event.wait() # Wait for the input audio stream process to be actually started
if barrier is not None:
barrier.wait() # Wait for multiple input audio streams to be started at the same time
self._clear_queue(self.read_queue)
self._clear_queue(self.record_queue)
if output_file is not None:
self.record_thread.start()
self.is_recording = True
self.stream.start()
def stop_recording(self) -> None:
"""
@@ -327,9 +393,9 @@ class PortAudioMicrophone(Microphone):
if not self.is_recording:
raise DeviceNotRecordingError(f"Microphone {self.microphone_index} is not recording.")
if self.stream.active:
self.stream.stop() # Wait for all buffers to be processed
# Remark : stream.abort() flushes the buffers !
if self.stream_process is not None:
self.stream_start_event.clear() # Ensures the stream is not started again !
self.stream_stop_event.set()
self.is_recording = False
if self.record_thread is not None:
@@ -340,17 +406,37 @@ class PortAudioMicrophone(Microphone):
self.record_stop_event = None
self.is_writing = False
self.logs["stop_timestamp"] = capture_timestamp_utc()
def disconnect(self) -> None:
@staticmethod
def _record_loop(queue, event: Event, sample_rate: int, channels: list[int], output_file: Path) -> None:
"""
Disconnects the microphone and stops the recording.
Thread/Process-safe loop to write audio data into a file.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.")
# Can only be run on a single process/thread for file writing safety
with SoundFile(
output_file,
mode="w",
samplerate=sample_rate,
channels=len(channels),
format="WAV",
subtype="FLOAT", # By default, a much lower quality WAV file is created !
) as file:
while not event.is_set():
try:
file.write(
queue.get(timeout=0.005)
) # Timeout set as the usual sounddevice buffer size. get_nowait is not possible here as it saturates the thread.
queue.task_done()
except Empty:
continue
if self.is_recording:
self.stop_recording()
self.stream.close()
self._is_connected = False
@staticmethod
def _clear_queue(queue):
"""
Clears the queue by getting all items until it is empty. The longer the queue, the longer it takes to clear it.
"""
try:
while True:
queue.get_nowait()
queue.task_done()
except Empty:
return

View File

@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from multiprocessing import Barrier
from queue import Queue
from threading import Thread
@@ -49,9 +50,11 @@ def async_microphones_start_recording(
if output_files is None:
output_files = [None] * len(microphones)
barrier = Barrier(len(microphones))
for microphone, output_file in zip(microphones.values(), output_files, strict=False):
start_recording_threads.append(
Thread(target=microphone.start_recording, args=(output_file, multiprocessing, overwrite))
Thread(target=microphone.start_recording, args=(output_file, multiprocessing, overwrite, barrier))
)
for thread in start_recording_threads: