Source code for simvx.core.audio_backend._miniaudio

"""The native (``ma_engine``) production audio backend.

``MiniaudioBackend`` mixes, spatializes, and runs DSP entirely in native C via
the ``simvx.core._native.miniaudio_engine`` CFFI wrapper. Python only pushes
parameter updates. Falls back to the legacy / null backends via
``make_backend`` when the compiled extension is unavailable.
"""

from __future__ import annotations

import threading
from typing import TYPE_CHECKING, Any

import numpy as np

from .._native import miniaudio_engine as _me
from ..audio_errors import (
    AudioCapabilityError,
    AudioError,
    InvalidStreamError,
    raise_or_warn,
)
from ..audio_protocol import Capability
from ._effects import _NativeEffectChain
from ._shared import (
    _DEFAULT_CHANNELS,
    _DEFAULT_SAMPLE_RATE,
    _DESKTOP_CAPABILITIES,
    _NATIVE_EFFECT_CAPABILITIES,
    _alloc_channel_id,
    _db_to_linear,
    _register_atexit_shutdown,
    _SoundEntry,
    _warn_if_no_vorbis,
)

if TYPE_CHECKING:
    from ..audio import AudioClip
    from ..audio_bus import AudioBusLayout


[docs] class MiniaudioBackend: """Native-mixed audio backend on top of ``ma_engine``. All mixing, spatialization, and DSP runs in native C: Python only pushes parameter updates. Target latency: ``buffersize_msec=20`` (vs the legacy backend's 100 ms). The native extension must be built once after install: simvx build-audio If the extension is missing, the constructor raises ``MiniaudioEngineUnavailable``. Use ``make_backend()`` for an automatic fallback to ``_LegacyMiniaudioBackend`` with a one-time warning. """ def __init__( self, sample_rate: int = _DEFAULT_SAMPLE_RATE, nchannels: int = _DEFAULT_CHANNELS, ): self._sample_rate = sample_rate self._nchannels = nchannels self._lock = threading.Lock() self._sounds: dict[int, _SoundEntry] = {} self._groups: dict[str, _me.SoundGroup] = {} # Per-bus native effect chains (one per group). Built lazily on # first sync_bus_layout that sees a non-empty effects list. self._effect_chains: dict[str, _NativeEffectChain] = {} # Snapshot of (volume_db, mute, send_to) per bus so sync_bus_layout # is cheap to re-call every frame: only push to native on change. self._bus_snapshot: dict[str, tuple[float, bool, str]] = {} _warn_if_no_vorbis() self._engine = _me.Engine( sample_rate=sample_rate, channels=nchannels, device=True ) # Seed default groups so unfamiliar bus names play at master gain. self._ensure_default_groups() # Belt-and-braces: register an atexit hook so a `sys.exit(0)` from # anywhere (test runners, demos that bypass App.quit()) still # shuts down the engine and joins miniaudio's audio thread. # Captured by weakref so the hook doesn't pin the backend alive. _register_atexit_shutdown(self) # -- internal helpers ----------------------------------------------------- def _ensure_default_groups(self) -> None: from ..audio_bus import AudioBusLayout layout = AudioBusLayout.get_default() self.sync_bus_layout(layout) def _ensure_group(self, bus_name: str) -> _me.SoundGroup: """Return the SoundGroup for ``bus_name``, lazily creating if absent. Bus names must match the active :class:`AudioBusLayout` exactly: lookup is case-sensitive. Unknown buses raise :class:`UnknownBusError` so typos surface at the call site rather than playing on a phantom root-attached group. """ group = self._groups.get(bus_name) if group is not None: return group # Validate against the active layout before allocating native state. from ..audio_bus import AudioBusLayout layout = AudioBusLayout.get_default() layout.get_bus(bus_name) # raises UnknownBusError on miss try: group = _me.SoundGroup(self._engine) except Exception as exc: raise AudioError( f"Failed to create native group for bus {bus_name!r}" ) from exc self._groups[bus_name] = group return group def _make_sound( self, stream: AudioClip, *, bus: str, spatial: bool, pitch_enabled: bool, ) -> tuple[Any, Any] | tuple[None, None]: """Build a ``_me.Sound`` for `stream`. Returns (sound, keeper). `keeper` pins an ``AudioBuffer`` when the source is an ndarray; ``None`` when loaded from a file path. """ group = self._ensure_group(bus) backend_data = getattr(stream, "backend_data", None) if isinstance(backend_data, np.ndarray): try: buf = _me.AudioBuffer( backend_data, sample_rate=self._sample_rate, channels=self._nchannels, ) sound = _me.Sound.from_buffer( self._engine, buf, group=group, spatial=spatial, pitch_enabled=pitch_enabled, ) return sound, buf except Exception as exc: raise InvalidStreamError( f"Failed to play ndarray source: {exc}" ) from exc path = getattr(stream, "path", None) if path: try: sound = _me.Sound.from_file( self._engine, path, group=group, spatial=spatial, pitch_enabled=pitch_enabled, ) return sound, None except Exception as exc: raise InvalidStreamError( f"Failed to play file {path!r}: {exc}" ) from exc return None, None # -- public interface -----------------------------------------------------
[docs] def play_audio( self, stream: AudioClip, *, mode: str = "non_positional", position: Any = None, volume_db: float = 0.0, pitch: float = 1.0, loop: bool = False, bus: str = "Master", max_distance: float = 100.0, from_position: float = 0.0, pan: float = 0.0, gain_db: float = 0.0, ) -> int | None: # 2D / 3D players compute ``pan`` + ``volume_db`` Python-side and # forward them here so the native spatializer applies them before # the first buffer (matches web backend behaviour exactly). HRTF # is intentionally not used for the 2D path: wrong shape for 2D # scenes, so the native ``spatial`` flag stays False; ``mode`` is # accepted for Protocol parity but the native ma_engine path does # not currently branch on it. del mode, position, max_distance sound, keeper = self._make_sound( stream, bus=bus, spatial=False, pitch_enabled=True ) if sound is None: return None sound.set_volume(_db_to_linear(volume_db + gain_db)) sound.set_pitch(max(0.1, float(pitch))) sound.set_looping(bool(loop)) # Apply pan *before* ``sound.start()`` so the audio thread starts # mixing with the correct pan on its very first buffer. Without # this, spatial players play centred for one buffer (~20 ms on the # native path) before the per-frame ``update_audio_2d/3d`` lands. sound.set_pan(max(-1.0, min(1.0, float(pan)))) if from_position > 0.0: try: sound.seek_to_frame(int(float(from_position) * self._sample_rate)) except Exception as exc: sound.shutdown() raise AudioError( f"seek_to_frame failed for from_position={from_position}: {exc}" ) from exc try: sound.start() except Exception as exc: sound.shutdown() raise AudioError(f"sound.start failed: {exc}") from exc cid = _alloc_channel_id() with self._lock: self._sounds[cid] = _SoundEntry(sound=sound, bus=bus, keeper=keeper) return cid
[docs] def stop_audio(self, channel_id: int) -> None: with self._lock: entry = self._sounds.pop(channel_id, None) if entry is None: return try: entry.sound.stop() except Exception as exc: raise_or_warn( exc, key="audio.native.stop_audio.sound_stop_failed", message="MiniaudioBackend.stop_audio: sound.stop failed", ) entry.sound.shutdown()
[docs] def pause_audio(self, channel_id: int) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return try: entry.sound.stop() entry.paused = True except Exception as exc: raise_or_warn( exc, key="audio.native.pause_audio.sound_stop_failed", message="MiniaudioBackend.pause_audio: sound.stop failed", )
[docs] def resume_audio(self, channel_id: int) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None or not entry.paused: return try: entry.sound.start() entry.paused = False except Exception as exc: raise_or_warn( exc, key="audio.native.resume_audio.sound_start_failed", message="MiniaudioBackend.resume_audio: sound.start failed", )
[docs] def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return entry.sound.set_volume(_db_to_linear(volume_db)) entry.sound.set_pan(max(-1.0, min(1.0, float(pan))))
[docs] def update_audio_3d( self, channel_id: int, volume_db: float, pan: float, pitch: float ) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return entry.sound.set_volume(_db_to_linear(volume_db)) entry.sound.set_pan(max(-1.0, min(1.0, float(pan)))) entry.sound.set_pitch(max(0.1, float(pitch)))
[docs] def set_pitch(self, channel_id: int, pitch: float) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return if entry.streaming: # Native streaming sounds are built with ``pitch_enabled=False`` # (the resampler would read past what the producer has written); # surface that as a typed capability error rather than silently # dropping the change. raise AudioCapabilityError( "pitch.streaming", backend="MiniaudioBackend", advertised=self.list_capabilities(), remediation=( "Pitch modulation is not supported on streaming sounds. " "Use AudioPlayer with stream_mode='memory' for " "pitch control." ), ) entry.sound.set_pitch(max(0.1, float(pitch)))
[docs] def get_playback_position(self, channel_id: int) -> float: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return 0.0 return entry.sound.cursor_frames() / float(self._sample_rate)
[docs] def is_channel_active(self, channel_id: int) -> bool: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return False if entry.paused: return True return entry.sound.is_playing() and not entry.sound.at_end()
[docs] def open_stream( self, *, volume_db: float = 0.0, bus: str = "Master", buffer_seconds: float = 0.5, stream: AudioClip | None = None, ) -> int: """Open a streaming channel. ``buffer_seconds`` (default 0.5 s) trades latency for underrun tolerance. At 48 kHz stereo s16 = ~96 KB. Drop to 0.1 s for low-latency interactive synthesis; bump to 1-2 s for music streams under heavy CPU load. Producer pushes int16 stereo bytes via ``feed_audio_chunk``; underrun produces silence (no glitch). Only applies to the chunk-fed PCM path: when ``stream`` is a compressed container, miniaudio's ``ma_sound_init_from_file(..., MA_SOUND_FLAG_STREAM)`` owns the buffering internally. Container routing (when ``stream`` is provided): * ``"wav"`` / ``"pcm"`` (synthetic) / ``None``: open an ``ma_pcm_rb``-backed sound. Caller feeds raw int16 stereo bytes. * ``"ogg"`` / ``"mp3"`` / ``"flac"``: open the file directly via ``Sound.from_file(stream=True)``. Subsequent :meth:`feed_audio_chunk` calls are no-ops for this channel. * ``"unknown"``: raise :class:`InvalidStreamError`. Pitch is intentionally not a parameter: streaming sounds are built with ``pitch_enabled=False`` so the resampler can't read past what the producer has written. ``set_pitch`` against the returned channel raises :class:`AudioCapabilityError`. """ group = self._ensure_group(bus) container = stream.container if stream is not None else "wav" if container in ("ogg", "mp3", "flac"): path = stream.path if stream is not None else "" if not path: raise InvalidStreamError( f"Native streaming open: compressed container {container!r} " "requires a file path on the AudioClip" ) try: sound = _me.Sound.from_file( self._engine, path, group=group, stream=True, pitch_enabled=False ) except Exception as exc: raise AudioError( f"open_stream failed for {container} file {path!r}: {exc}" ) from exc sound.set_volume(_db_to_linear(volume_db)) try: sound.start() except Exception as exc: sound.shutdown() raise AudioError(f"open_stream sound.start failed: {exc}") from exc cid = _alloc_channel_id() with self._lock: # ``keeper`` is None: there's no Python-side rb to pin. The # ma_sound itself owns the decoder + buffering. ``streaming`` # stays True so ``set_pitch`` raises the same # ``AudioCapabilityError`` as the chunk-fed path. self._sounds[cid] = _SoundEntry( sound=sound, bus=bus, keeper=None, streaming=True ) return cid if container == "unknown": path = stream.path if stream is not None else "<no-path>" raise InvalidStreamError( f"Native streaming open: could not detect audio container for {path!r}. " "Expected WAV/OGG/MP3/FLAC header." ) # container in {"wav", "pcm"} (or stream is None: legacy callers # without a stream object get the chunk-fed PCM ring path). try: rb = _me.StreamSource( self._engine, sample_rate=self._sample_rate, channels=self._nchannels, buffer_seconds=buffer_seconds, format="s16", ) sound = _me.Sound.from_stream(self._engine, rb, group=group) except Exception as exc: raise AudioError(f"open_stream failed: {exc}") from exc sound.set_volume(_db_to_linear(volume_db)) try: sound.start() except Exception as exc: sound.shutdown() rb.shutdown() raise AudioError(f"open_stream sound.start failed: {exc}") from exc cid = _alloc_channel_id() with self._lock: self._sounds[cid] = _SoundEntry( sound=sound, bus=bus, keeper=rb, streaming=True ) return cid
[docs] def feed_audio_chunk(self, channel_id: int, chunk: bytes) -> None: """Push raw int16 stereo bytes into the channel's ring buffer. Misaligned chunks (length not a multiple of bytes-per-frame) have the trailing partial frame trimmed. When the ring is full the excess is silently dropped: callers can poll ``entry.keeper.available_write_frames`` if they need to know. """ if not chunk: return with self._lock: entry = self._sounds.get(channel_id) if entry is None or entry.keeper is None: return if not isinstance(entry.keeper, _me.StreamSource): return try: entry.keeper.write(chunk) except Exception as exc: raise_or_warn( exc, key="audio.native.feed_audio_chunk.ring_write_failed", message="MiniaudioBackend.feed_audio_chunk: ring write failed", )
# ------------------------------------------------------------------ # 3D listener: drives the native ma_engine spatializer. # ------------------------------------------------------------------
[docs] def set_listener_position(self, x: float, y: float, z: float) -> None: """Forward listener position to the native ma_engine spatializer.""" self._engine.set_listener_position(float(x), float(y), float(z))
[docs] def set_listener_velocity(self, x: float, y: float, z: float) -> None: """Forward listener velocity (m/s) to the native engine for Doppler.""" self._engine.set_listener_velocity(float(x), float(y), float(z))
[docs] def set_listener_direction(self, x: float, y: float, z: float) -> None: """Forward listener forward vector to the native engine.""" self._engine.set_listener_direction(float(x), float(y), float(z))
[docs] def set_listener_world_up(self, x: float, y: float, z: float) -> None: """Forward listener world-up vector to the native engine.""" self._engine.set_listener_world_up(float(x), float(y), float(z))
[docs] def shutdown(self) -> None: with self._lock: entries = list(self._sounds.values()) self._sounds.clear() chains = list(self._effect_chains.values()) self._effect_chains.clear() groups = list(self._groups.values()) self._groups.clear() for entry in entries: try: entry.sound.shutdown() except Exception as exc: raise_or_warn( exc, key="audio.native.shutdown.sound_cleanup_failed", message="MiniaudioBackend.shutdown: sound cleanup failed", ) for chain in chains: try: chain.shutdown() except Exception as exc: raise_or_warn( exc, key="audio.native.shutdown.effect_chain_cleanup_failed", message="MiniaudioBackend.shutdown: effect chain cleanup failed", ) for group in groups: try: group.shutdown() except Exception as exc: raise_or_warn( exc, key="audio.native.shutdown.group_cleanup_failed", message="MiniaudioBackend.shutdown: group cleanup failed", ) try: self._engine.shutdown() except Exception as exc: raise_or_warn( exc, key="audio.native.shutdown.engine_cleanup_failed", message="MiniaudioBackend.shutdown: engine cleanup failed", )
[docs] def list_capabilities(self) -> frozenset[Capability]: return _DESKTOP_CAPABILITIES | _NATIVE_EFFECT_CAPABILITIES
[docs] def sync_bus_layout(self, layout: AudioBusLayout) -> None: """Reconcile native ``ma_sound_group`` volumes + effect chains against `layout`. Cheap and idempotent: pushes only on change. Called by the audio server every frame; safe to call manually. """ for bus in layout.buses: # Bus volume / mute / routing. current = (bus.volume_db, bus.mute, bus.send_to) previous = self._bus_snapshot.get(bus.name) if previous != current: self._bus_snapshot[bus.name] = current group = self._ensure_group(bus.name) if group is not None: effective = bus.effective_linear_volume try: group.set_volume(effective) except Exception as exc: raise_or_warn( exc, key="audio.native.sync_bus_layout.set_volume_failed", message=f"MiniaudioBackend.sync_bus_layout: set_volume failed for bus {bus.name!r}", ) # Effect chain reconciliation. Chain rebuilds only on signature # change; cheap if the effects list is unchanged. group = self._ensure_group(bus.name) if group is None: continue chain = self._effect_chains.get(bus.name) if chain is None: # Only allocate a chain object when there's at least one # effect to install (avoids per-bus overhead when buses # have no effects). if not bus.effects: continue chain = _NativeEffectChain(self._engine, group, _me.engine_endpoint(self._engine)) self._effect_chains[bus.name] = chain try: chain.reconcile(bus.effects) except Exception as exc: raise_or_warn( exc, key="audio.native.sync_bus_layout.effect_chain_rebuild_failed", message=f"MiniaudioBackend.sync_bus_layout: effect chain rebuild failed for bus {bus.name!r}", )