Source code for simvx.core.audio_backend

"""Miniaudio-based audio backend for SimVX.

Provides ``MiniaudioBackend`` — a real-time audio mixer that plays decoded
PCM via ``miniaudio.PlaybackDevice``.  Each active sound is tracked as a
``_Channel`` with its own gain, pan, pitch, loop flag, and cursor.  The
device callback mixes all active channels into a single interleaved S16
output buffer every audio period.

Duck-typed interface consumed by ``AudioStreamPlayer``, ``AudioStreamPlayer2D``,
and ``AudioStreamPlayer3D``.
"""


from __future__ import annotations

import logging
import threading
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

import numpy as np

if TYPE_CHECKING:
    from .audio import AudioStream

log = logging.getLogger(__name__)

__all__ = ["MiniaudioBackend"]

# ---------------------------------------------------------------------------
# Internal channel representation
# ---------------------------------------------------------------------------

_SAMPLE_RATE = 44100
_NCHANNELS = 2


@dataclass
class _Channel:
    """A single active voice in the mixer."""

    samples: np.ndarray  # float32, interleaved stereo (nframes * 2,)
    cursor: int = 0  # current sample-pair offset (in frames)
    total_frames: int = 0
    volume: float = 1.0  # linear gain
    pan: float = 0.0  # -1 left .. +1 right
    pitch: float = 1.0
    loop: bool = False
    paused: bool = False
    stopped: bool = False
    # For streaming mode
    streaming: bool = False
    stream_buffer: bytearray = field(default_factory=bytearray)


# Monotonic channel id counter
_next_id = 0
_id_lock = threading.Lock()


def _alloc_id() -> int:
    global _next_id
    with _id_lock:
        _next_id += 1
        return _next_id


# ---------------------------------------------------------------------------
# Volume helpers
# ---------------------------------------------------------------------------


def _db_to_linear(db: float) -> float:
    """Convert decibels to linear gain (0.0 .. ~15.85)."""
    if db <= -80.0:
        return 0.0
    return 10.0 ** (db / 20.0)


def _load_stream(stream: AudioStream) -> np.ndarray | None:
    """Decode an AudioStream into float32 interleaved stereo samples.

    Supports:
    - File paths (WAV/OGG/MP3/FLAC via miniaudio)
    - ``AudioStream.backend_data`` already containing an ndarray
    - Raw PCM bytes stored in ``backend_data``
    """
    import miniaudio

    # Already decoded
    if stream.backend_data is not None and isinstance(stream.backend_data, np.ndarray):
        return stream.backend_data

    path = stream.path
    if not path:
        return None

    try:
        decoded = miniaudio.decode_file(
            path,
            output_format=miniaudio.SampleFormat.SIGNED16,
            nchannels=_NCHANNELS,
            sample_rate=_SAMPLE_RATE,
        )
    except (miniaudio.DecodeError, FileNotFoundError, OSError) as exc:
        log.warning("audio_backend: failed to decode %r: %s", path, exc)
        return None

    # Convert S16 samples to float32 for mixing (-1.0 .. +1.0)
    raw = np.frombuffer(decoded.samples, dtype=np.int16).astype(np.float32) / 32768.0
    stream.backend_data = raw  # cache for next play()
    return raw


# ---------------------------------------------------------------------------
# MiniaudioBackend
# ---------------------------------------------------------------------------


[docs] class MiniaudioBackend: """Real-time audio mixer using miniaudio's PlaybackDevice. The backend owns a single playback device that runs a callback on a dedicated audio thread. All public methods are thread-safe — they mutate ``_channels`` under a lock while the audio thread reads from it. """ def __init__(self, sample_rate: int = _SAMPLE_RATE, nchannels: int = _NCHANNELS): import miniaudio self._sample_rate = sample_rate self._nchannels = nchannels self._lock = threading.Lock() self._channels: dict[int, _Channel] = {} self._device = miniaudio.PlaybackDevice( output_format=miniaudio.SampleFormat.SIGNED16, nchannels=nchannels, sample_rate=sample_rate, buffersize_msec=100, ) gen = self._audio_callback() next(gen) # prime the generator before handing it to miniaudio self._device.start(gen) # -- callback generator --------------------------------------------------- def _audio_callback(self): """Generator that miniaudio calls each audio period. Yields silence initially, then receives ``num_frames`` and yields mixed PCM data (interleaved S16 stereo). """ while True: num_frames = yield b"" # first yield gives empty; subsequent ones get frame count if num_frames is None or num_frames <= 0: continue total_samples = num_frames * self._nchannels mix = np.zeros(total_samples, dtype=np.float32) with self._lock: dead: list[int] = [] for ch_id, ch in self._channels.items(): if ch.stopped or ch.paused: if ch.stopped: dead.append(ch_id) continue if ch.streaming: self._mix_streaming(ch, mix, num_frames) continue self._mix_channel(ch, mix, num_frames) if ch.stopped: dead.append(ch_id) for ch_id in dead: del self._channels[ch_id] # Clip and convert to S16 np.clip(mix, -1.0, 1.0, out=mix) out = (mix * 32767).astype(np.int16) yield out.tobytes() def _mix_channel(self, ch: _Channel, mix: np.ndarray, num_frames: int) -> None: """Mix a decoded channel into the output buffer, handling pitch/loop.""" samples = ch.samples n_total = ch.total_frames cursor = ch.cursor pitch = max(0.1, ch.pitch) # Compute left/right gain from volume and pan vol = ch.volume left_gain = vol * min(1.0, 1.0 - ch.pan) right_gain = vol * min(1.0, 1.0 + ch.pan) if abs(pitch - 1.0) < 0.01: # Fast path: pitch ~= 1.0, direct copy remaining = n_total - cursor if remaining <= 0: if ch.loop: ch.cursor = 0 cursor = 0 remaining = n_total else: ch.stopped = True return frames_to_copy = min(num_frames, remaining) src_start = cursor * 2 src_end = src_start + frames_to_copy * 2 chunk = samples[src_start:src_end].copy() # Apply per-channel gain chunk[0::2] *= left_gain chunk[1::2] *= right_gain mix[: frames_to_copy * 2] += chunk ch.cursor = cursor + frames_to_copy if frames_to_copy < num_frames and ch.loop: ch.cursor = 0 # Recursively fill remainder (tail call, bounded by buffer size) remainder_mix = mix[frames_to_copy * 2 :] self._mix_channel(ch, remainder_mix, num_frames - frames_to_copy) elif ch.cursor >= n_total and not ch.loop: ch.stopped = True else: # Pitch-shifted playback via linear interpolation out_idx = 0 fcursor = float(cursor) for _ in range(num_frames): int_pos = int(fcursor) if int_pos >= n_total - 1: if ch.loop: fcursor -= n_total int_pos = int(fcursor) if int_pos < 0: int_pos = 0 fcursor = 0.0 else: ch.stopped = True break frac = fcursor - int_pos idx = int_pos * 2 # Linear interpolation between adjacent sample pairs s0_l = samples[idx] s0_r = samples[idx + 1] s1_l = samples[idx + 2] if idx + 2 < len(samples) else s0_l s1_r = samples[idx + 3] if idx + 3 < len(samples) else s0_r mix[out_idx] += (s0_l + frac * (s1_l - s0_l)) * left_gain mix[out_idx + 1] += (s0_r + frac * (s1_r - s0_r)) * right_gain out_idx += 2 fcursor += pitch ch.cursor = int(fcursor) def _mix_streaming(self, ch: _Channel, mix: np.ndarray, num_frames: int) -> None: """Mix buffered streaming data into the output.""" buf = ch.stream_buffer bytes_needed = num_frames * self._nchannels * 2 # S16 = 2 bytes per sample available = min(len(buf), bytes_needed) if available < 4: return # Truncate to frame boundary available -= available % (self._nchannels * 2) raw = np.frombuffer(buf[:available], dtype=np.int16).astype(np.float32) / 32768.0 vol = ch.volume left_gain = vol * min(1.0, 1.0 - ch.pan) right_gain = vol * min(1.0, 1.0 + ch.pan) raw[0::2] *= left_gain raw[1::2] *= right_gain mix[: len(raw)] += raw del buf[:available] # -- public interface (called by audio nodes) ------------------------------
[docs] def play_audio( self, stream: AudioStream, *, volume_db: float = 0.0, pitch: float = 1.0, loop: bool = False, bus: str = "master", ) -> int | None: """Decode and play an audio stream. Returns a channel ID.""" samples = _load_stream(stream) if samples is None: return None ch = _Channel( samples=samples, total_frames=len(samples) // self._nchannels, volume=self._bus_volume(volume_db, bus), pitch=pitch, loop=loop, ) ch_id = _alloc_id() with self._lock: self._channels[ch_id] = ch return ch_id
[docs] def play_audio_2d( self, stream: AudioStream, *, position: Any = None, volume_db: float = 0.0, pitch: float = 1.0, loop: bool = False, bus: str = "sfx", max_distance: float = 2000.0, ) -> int | None: """Play a 2D-positioned sound. Spatialization is updated per-frame via ``update_audio_2d``.""" return self.play_audio(stream, volume_db=volume_db, pitch=pitch, loop=loop, bus=bus)
[docs] def play_audio_3d( self, stream: AudioStream, *, position: Any = None, volume_db: float = 0.0, pitch: float = 1.0, loop: bool = False, bus: str = "sfx", max_distance: float = 100.0, ) -> int | None: """Play a 3D-positioned sound. Spatialization is updated per-frame via ``update_audio_3d``.""" return self.play_audio(stream, volume_db=volume_db, pitch=pitch, loop=loop, bus=bus)
[docs] def stop_audio(self, channel_id: int) -> None: """Stop a playing channel.""" with self._lock: ch = self._channels.get(channel_id) if ch: ch.stopped = True
[docs] def pause_audio(self, channel_id: int) -> None: """Pause a playing channel.""" with self._lock: ch = self._channels.get(channel_id) if ch: ch.paused = True
[docs] def resume_audio(self, channel_id: int) -> None: """Resume a paused channel.""" with self._lock: ch = self._channels.get(channel_id) if ch: ch.paused = False
[docs] def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None: """Update volume and pan for a 2D channel (called each frame).""" with self._lock: ch = self._channels.get(channel_id) if ch: ch.volume = _db_to_linear(volume_db) ch.pan = max(-1.0, min(1.0, pan))
[docs] def update_audio_3d(self, channel_id: int, volume_db: float, pan: float, pitch: float) -> None: """Update volume, pan, and pitch for a 3D channel (called each frame).""" with self._lock: ch = self._channels.get(channel_id) if ch: ch.volume = _db_to_linear(volume_db) ch.pan = max(-1.0, min(1.0, pan)) ch.pitch = max(0.1, pitch)
[docs] def get_playback_position(self, channel_id: int) -> float: """Return current playback position in seconds.""" with self._lock: ch = self._channels.get(channel_id) if ch: return ch.cursor / self._sample_rate return 0.0
[docs] def open_stream(self, *, volume_db: float = 0.0, pitch: float = 1.0, bus: str = "master") -> int: """Open a streaming channel that accepts raw PCM chunks via ``feed_audio_chunk``.""" ch = _Channel( samples=np.empty(0, dtype=np.float32), streaming=True, volume=self._bus_volume(volume_db, bus), pitch=pitch, ) ch_id = _alloc_id() with self._lock: self._channels[ch_id] = ch return ch_id
[docs] def feed_audio_chunk(self, channel_id: int, chunk: bytes) -> None: """Append raw PCM bytes to a streaming channel's buffer.""" with self._lock: ch = self._channels.get(channel_id) if ch and ch.streaming: ch.stream_buffer.extend(chunk)
[docs] def shutdown(self) -> None: """Stop the playback device and release resources.""" try: self._device.close() except Exception: log.debug("Failed to close audio device", exc_info=True) with self._lock: self._channels.clear()
# -- helpers --------------------------------------------------------------- def _bus_volume(self, volume_db: float, bus: str) -> float: """Compute linear volume incorporating the audio bus layout.""" from .audio_bus import AudioBusLayout layout = AudioBusLayout.get_default() bus_db = layout.get_effective_volume(bus) if layout.get_bus(bus) else 0.0 return _db_to_linear(volume_db + bus_db)