"""Miniaudio-based audio backend for SimVX.
Provides ``MiniaudioBackend`` — a real-time audio mixer that plays decoded
PCM via ``miniaudio.PlaybackDevice``. Each active sound is tracked as a
``_Channel`` with its own gain, pan, pitch, loop flag, and cursor. The
device callback mixes all active channels into a single interleaved S16
output buffer every audio period.
Duck-typed interface consumed by ``AudioStreamPlayer``, ``AudioStreamPlayer2D``,
and ``AudioStreamPlayer3D``.
"""
from __future__ import annotations
import logging
import threading
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import numpy as np
if TYPE_CHECKING:
from .audio import AudioStream
log = logging.getLogger(__name__)
__all__ = ["MiniaudioBackend"]
# ---------------------------------------------------------------------------
# Internal channel representation
# ---------------------------------------------------------------------------
_SAMPLE_RATE = 44100
_NCHANNELS = 2
@dataclass
class _Channel:
"""A single active voice in the mixer."""
samples: np.ndarray # float32, interleaved stereo (nframes * 2,)
cursor: int = 0 # current sample-pair offset (in frames)
total_frames: int = 0
volume: float = 1.0 # linear gain
pan: float = 0.0 # -1 left .. +1 right
pitch: float = 1.0
loop: bool = False
paused: bool = False
stopped: bool = False
# For streaming mode
streaming: bool = False
stream_buffer: bytearray = field(default_factory=bytearray)
# Monotonic channel id counter
_next_id = 0
_id_lock = threading.Lock()
def _alloc_id() -> int:
global _next_id
with _id_lock:
_next_id += 1
return _next_id
# ---------------------------------------------------------------------------
# Volume helpers
# ---------------------------------------------------------------------------
def _db_to_linear(db: float) -> float:
"""Convert decibels to linear gain (0.0 .. ~15.85)."""
if db <= -80.0:
return 0.0
return 10.0 ** (db / 20.0)
def _load_stream(stream: AudioStream) -> np.ndarray | None:
"""Decode an AudioStream into float32 interleaved stereo samples.
Supports:
- File paths (WAV/OGG/MP3/FLAC via miniaudio)
- ``AudioStream.backend_data`` already containing an ndarray
- Raw PCM bytes stored in ``backend_data``
"""
import miniaudio
# Already decoded
if stream.backend_data is not None and isinstance(stream.backend_data, np.ndarray):
return stream.backend_data
path = stream.path
if not path:
return None
try:
decoded = miniaudio.decode_file(
path,
output_format=miniaudio.SampleFormat.SIGNED16,
nchannels=_NCHANNELS,
sample_rate=_SAMPLE_RATE,
)
except (miniaudio.DecodeError, FileNotFoundError, OSError) as exc:
log.warning("audio_backend: failed to decode %r: %s", path, exc)
return None
# Convert S16 samples to float32 for mixing (-1.0 .. +1.0)
raw = np.frombuffer(decoded.samples, dtype=np.int16).astype(np.float32) / 32768.0
stream.backend_data = raw # cache for next play()
return raw
# ---------------------------------------------------------------------------
# MiniaudioBackend
# ---------------------------------------------------------------------------
[docs]
class MiniaudioBackend:
"""Real-time audio mixer using miniaudio's PlaybackDevice.
The backend owns a single playback device that runs a callback on a
dedicated audio thread. All public methods are thread-safe — they
mutate ``_channels`` under a lock while the audio thread reads from it.
"""
def __init__(self, sample_rate: int = _SAMPLE_RATE, nchannels: int = _NCHANNELS):
import miniaudio
self._sample_rate = sample_rate
self._nchannels = nchannels
self._lock = threading.Lock()
self._channels: dict[int, _Channel] = {}
self._device = miniaudio.PlaybackDevice(
output_format=miniaudio.SampleFormat.SIGNED16,
nchannels=nchannels,
sample_rate=sample_rate,
buffersize_msec=100,
)
gen = self._audio_callback()
next(gen) # prime the generator before handing it to miniaudio
self._device.start(gen)
# -- callback generator ---------------------------------------------------
def _audio_callback(self):
"""Generator that miniaudio calls each audio period.
Yields silence initially, then receives ``num_frames`` and yields
mixed PCM data (interleaved S16 stereo).
"""
while True:
num_frames = yield b"" # first yield gives empty; subsequent ones get frame count
if num_frames is None or num_frames <= 0:
continue
total_samples = num_frames * self._nchannels
mix = np.zeros(total_samples, dtype=np.float32)
with self._lock:
dead: list[int] = []
for ch_id, ch in self._channels.items():
if ch.stopped or ch.paused:
if ch.stopped:
dead.append(ch_id)
continue
if ch.streaming:
self._mix_streaming(ch, mix, num_frames)
continue
self._mix_channel(ch, mix, num_frames)
if ch.stopped:
dead.append(ch_id)
for ch_id in dead:
del self._channels[ch_id]
# Clip and convert to S16
np.clip(mix, -1.0, 1.0, out=mix)
out = (mix * 32767).astype(np.int16)
yield out.tobytes()
def _mix_channel(self, ch: _Channel, mix: np.ndarray, num_frames: int) -> None:
"""Mix a decoded channel into the output buffer, handling pitch/loop."""
samples = ch.samples
n_total = ch.total_frames
cursor = ch.cursor
pitch = max(0.1, ch.pitch)
# Compute left/right gain from volume and pan
vol = ch.volume
left_gain = vol * min(1.0, 1.0 - ch.pan)
right_gain = vol * min(1.0, 1.0 + ch.pan)
if abs(pitch - 1.0) < 0.01:
# Fast path: pitch ~= 1.0, direct copy
remaining = n_total - cursor
if remaining <= 0:
if ch.loop:
ch.cursor = 0
cursor = 0
remaining = n_total
else:
ch.stopped = True
return
frames_to_copy = min(num_frames, remaining)
src_start = cursor * 2
src_end = src_start + frames_to_copy * 2
chunk = samples[src_start:src_end].copy()
# Apply per-channel gain
chunk[0::2] *= left_gain
chunk[1::2] *= right_gain
mix[: frames_to_copy * 2] += chunk
ch.cursor = cursor + frames_to_copy
if frames_to_copy < num_frames and ch.loop:
ch.cursor = 0
# Recursively fill remainder (tail call, bounded by buffer size)
remainder_mix = mix[frames_to_copy * 2 :]
self._mix_channel(ch, remainder_mix, num_frames - frames_to_copy)
elif ch.cursor >= n_total and not ch.loop:
ch.stopped = True
else:
# Pitch-shifted playback via linear interpolation
out_idx = 0
fcursor = float(cursor)
for _ in range(num_frames):
int_pos = int(fcursor)
if int_pos >= n_total - 1:
if ch.loop:
fcursor -= n_total
int_pos = int(fcursor)
if int_pos < 0:
int_pos = 0
fcursor = 0.0
else:
ch.stopped = True
break
frac = fcursor - int_pos
idx = int_pos * 2
# Linear interpolation between adjacent sample pairs
s0_l = samples[idx]
s0_r = samples[idx + 1]
s1_l = samples[idx + 2] if idx + 2 < len(samples) else s0_l
s1_r = samples[idx + 3] if idx + 3 < len(samples) else s0_r
mix[out_idx] += (s0_l + frac * (s1_l - s0_l)) * left_gain
mix[out_idx + 1] += (s0_r + frac * (s1_r - s0_r)) * right_gain
out_idx += 2
fcursor += pitch
ch.cursor = int(fcursor)
def _mix_streaming(self, ch: _Channel, mix: np.ndarray, num_frames: int) -> None:
"""Mix buffered streaming data into the output."""
buf = ch.stream_buffer
bytes_needed = num_frames * self._nchannels * 2 # S16 = 2 bytes per sample
available = min(len(buf), bytes_needed)
if available < 4:
return
# Truncate to frame boundary
available -= available % (self._nchannels * 2)
raw = np.frombuffer(buf[:available], dtype=np.int16).astype(np.float32) / 32768.0
vol = ch.volume
left_gain = vol * min(1.0, 1.0 - ch.pan)
right_gain = vol * min(1.0, 1.0 + ch.pan)
raw[0::2] *= left_gain
raw[1::2] *= right_gain
mix[: len(raw)] += raw
del buf[:available]
# -- public interface (called by audio nodes) ------------------------------
[docs]
def play_audio(
self,
stream: AudioStream,
*,
volume_db: float = 0.0,
pitch: float = 1.0,
loop: bool = False,
bus: str = "master",
) -> int | None:
"""Decode and play an audio stream. Returns a channel ID."""
samples = _load_stream(stream)
if samples is None:
return None
ch = _Channel(
samples=samples,
total_frames=len(samples) // self._nchannels,
volume=self._bus_volume(volume_db, bus),
pitch=pitch,
loop=loop,
)
ch_id = _alloc_id()
with self._lock:
self._channels[ch_id] = ch
return ch_id
[docs]
def play_audio_2d(
self,
stream: AudioStream,
*,
position: Any = None,
volume_db: float = 0.0,
pitch: float = 1.0,
loop: bool = False,
bus: str = "sfx",
max_distance: float = 2000.0,
) -> int | None:
"""Play a 2D-positioned sound. Spatialization is updated per-frame via ``update_audio_2d``."""
return self.play_audio(stream, volume_db=volume_db, pitch=pitch, loop=loop, bus=bus)
[docs]
def play_audio_3d(
self,
stream: AudioStream,
*,
position: Any = None,
volume_db: float = 0.0,
pitch: float = 1.0,
loop: bool = False,
bus: str = "sfx",
max_distance: float = 100.0,
) -> int | None:
"""Play a 3D-positioned sound. Spatialization is updated per-frame via ``update_audio_3d``."""
return self.play_audio(stream, volume_db=volume_db, pitch=pitch, loop=loop, bus=bus)
[docs]
def stop_audio(self, channel_id: int) -> None:
"""Stop a playing channel."""
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.stopped = True
[docs]
def pause_audio(self, channel_id: int) -> None:
"""Pause a playing channel."""
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.paused = True
[docs]
def resume_audio(self, channel_id: int) -> None:
"""Resume a paused channel."""
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.paused = False
[docs]
def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None:
"""Update volume and pan for a 2D channel (called each frame)."""
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.volume = _db_to_linear(volume_db)
ch.pan = max(-1.0, min(1.0, pan))
[docs]
def update_audio_3d(self, channel_id: int, volume_db: float, pan: float, pitch: float) -> None:
"""Update volume, pan, and pitch for a 3D channel (called each frame)."""
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.volume = _db_to_linear(volume_db)
ch.pan = max(-1.0, min(1.0, pan))
ch.pitch = max(0.1, pitch)
[docs]
def get_playback_position(self, channel_id: int) -> float:
"""Return current playback position in seconds."""
with self._lock:
ch = self._channels.get(channel_id)
if ch:
return ch.cursor / self._sample_rate
return 0.0
[docs]
def open_stream(self, *, volume_db: float = 0.0, pitch: float = 1.0, bus: str = "master") -> int:
"""Open a streaming channel that accepts raw PCM chunks via ``feed_audio_chunk``."""
ch = _Channel(
samples=np.empty(0, dtype=np.float32),
streaming=True,
volume=self._bus_volume(volume_db, bus),
pitch=pitch,
)
ch_id = _alloc_id()
with self._lock:
self._channels[ch_id] = ch
return ch_id
[docs]
def feed_audio_chunk(self, channel_id: int, chunk: bytes) -> None:
"""Append raw PCM bytes to a streaming channel's buffer."""
with self._lock:
ch = self._channels.get(channel_id)
if ch and ch.streaming:
ch.stream_buffer.extend(chunk)
[docs]
def shutdown(self) -> None:
"""Stop the playback device and release resources."""
try:
self._device.close()
except Exception:
log.debug("Failed to close audio device", exc_info=True)
with self._lock:
self._channels.clear()
# -- helpers ---------------------------------------------------------------
def _bus_volume(self, volume_db: float, bus: str) -> float:
"""Compute linear volume incorporating the audio bus layout."""
from .audio_bus import AudioBusLayout
layout = AudioBusLayout.get_default()
bus_db = layout.get_effective_volume(bus) if layout.get_bus(bus) else 0.0
return _db_to_linear(volume_db + bus_db)